This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8603164cc6 Improve realtime Lucene text index freshness/cpu/disk io 
usage (#13503)
8603164cc6 is described below

commit 8603164cc646dcb4bc2a6ead51cc9ad37441fda7
Author: Christopher Peck <27231838+itschrisp...@users.noreply.github.com>
AuthorDate: Fri Jul 26 14:04:13 2024 -0700

    Improve realtime Lucene text index freshness/cpu/disk io usage (#13503)
    
    * enable parallel background refresh for lucene text index
    
    * lint
    
    * address comments
---
 .../indexsegment/mutable/MutableSegmentImpl.java   |  35 ---
 .../invertedindex/LuceneNRTCachingMergePolicy.java |  64 +++++
 .../RealtimeLuceneIndexReaderRefreshThread.java    | 153 ------------
 .../RealtimeLuceneIndexRefreshManager.java         | 268 +++++++++++++++++++++
 .../RealtimeLuceneIndexRefreshState.java           | 139 -----------
 .../invertedindex/RealtimeLuceneTextIndex.java     |  28 ++-
 .../creator/impl/text/LuceneTextIndexCreator.java  |  13 +-
 .../converter/RealtimeSegmentConverterTest.java    |   2 +
 .../invertedindex/LuceneMutableTextIndexTest.java  |  19 +-
 .../NativeAndLuceneMutableTextIndexTest.java       |   3 +-
 .../RealtimeLuceneIndexRefreshManagerTest.java     | 160 ++++++++++++
 .../server/starter/helix/BaseServerStarter.java    |  19 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   5 +
 13 files changed, 554 insertions(+), 354 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 58d02af962..847f589f2a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -51,8 +51,6 @@ import 
org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
-import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
-import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector;
 import 
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
 import 
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
@@ -159,8 +157,6 @@ public class MutableSegmentImpl implements MutableSegment {
   private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
   private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
 
-  private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders 
_realtimeLuceneReaders;
-
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
 
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
@@ -347,17 +343,6 @@ public class MutableSegmentImpl implements MutableSegment {
         }
       }
 
-      // TODO - this logic is in the wrong place and belongs in a 
Lucene-specific submodule,
-      //  it is beyond the scope of realtime index pluggability to do this 
refactoring, so realtime
-      //  text indexes remain statically defined. Revisit this after this 
refactoring has been done.
-      MutableIndex textIndex = mutableIndexes.get(StandardIndexes.text());
-      if (textIndex instanceof RealtimeLuceneTextIndex) {
-        if (_realtimeLuceneReaders == null) {
-          _realtimeLuceneReaders = new 
RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
-        }
-        _realtimeLuceneReaders.addReader((RealtimeLuceneTextIndex) textIndex);
-      }
-
       Pair<String, ValueAggregator> columnAggregatorPair =
           metricsAggregators.getOrDefault(column, Pair.of(column, null));
       String sourceColumn = columnAggregatorPair.getLeft();
@@ -368,13 +353,6 @@ public class MutableSegmentImpl implements MutableSegment {
               nullValueVector, sourceColumn, valueAggregator));
     }
 
-    // TODO separate concerns: this logic does not belong here
-    if (_realtimeLuceneReaders != null) {
-      // add the realtime lucene index readers to the global queue for refresh 
task to pick up
-      RealtimeLuceneIndexRefreshState realtimeLuceneIndexRefreshState = 
RealtimeLuceneIndexRefreshState.getInstance();
-      
realtimeLuceneIndexRefreshState.addRealtimeReadersToQueue(_realtimeLuceneReaders);
-    }
-
     _partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
 
     _partitionUpsertMetadataManager = 
config.getPartitionUpsertMetadataManager();
@@ -993,19 +971,6 @@ public class MutableSegmentImpl implements MutableSegment {
       }
     }
 
-    // Stop the text index refresh before closing the indexes
-    if (_realtimeLuceneReaders != null) {
-      // set this to true as a way of signalling the refresh task thread to
-      // not attempt refresh on this segment here onwards
-      _realtimeLuceneReaders.getLock().lock();
-      try {
-        _realtimeLuceneReaders.setSegmentDestroyed();
-        _realtimeLuceneReaders.clearRealtimeReaderList();
-      } finally {
-        _realtimeLuceneReaders.getLock().unlock();
-      }
-    }
-
     // Close the indexes
     for (IndexContainer indexContainer : _indexContainerMap.values()) {
       indexContainer.close();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java
new file mode 100644
index 0000000000..ab811d8593
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneNRTCachingMergePolicy.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.lucene.index.MergeTrigger;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.NRTCachingDirectory;
+
+
+/**
+ * LuceneNRTCachingMergePolicy is a best-effort policy to generate merges for 
segments that are fully in memory,
+ * at the time of SegmentInfo selection. It does not consider segments that 
have been flushed to disk eligible
+ * for merging.
+ * <p>
+ * Each refresh creates a small Lucene segment. Increasing the frequency of 
refreshes to reduce indexing lag results
+ * in a large number of small segments, and high disk IO ops for merging them. 
By using this best-effort merge policy
+ * the small ops can be avoided since the segments are in memory when merged.
+ */
+public class LuceneNRTCachingMergePolicy extends TieredMergePolicy {
+  private final NRTCachingDirectory _nrtCachingDirectory;
+
+  public LuceneNRTCachingMergePolicy(NRTCachingDirectory nrtCachingDirectory) {
+    _nrtCachingDirectory = nrtCachingDirectory;
+  }
+
+  @Override
+  public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos 
segmentInfos, MergeContext mergeContext)
+      throws IOException {
+    SegmentInfos inMemorySegmentInfos = new 
SegmentInfos(segmentInfos.getIndexCreatedVersionMajor());
+    // Collect all segment commit infos that still have all files in memory
+    Set<String> cachedFiles = new 
HashSet<>(List.of(_nrtCachingDirectory.listCachedFiles()));
+    for (SegmentCommitInfo info : segmentInfos) {
+      for (String file : info.files()) {
+        if (!cachedFiles.contains(file)) {
+          break;
+        }
+      }
+      inMemorySegmentInfos.add(info);
+    }
+    return super.findMerges(mergeTrigger, inMemorySegmentInfos, mergeContext);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
deleted file mode 100644
index 13110b4f32..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.realtime.impl.invertedindex;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import org.apache.lucene.search.SearcherManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Background thread to refresh the realtime lucene index readers for 
supporting
- * near-realtime text search. The task maintains a queue of realtime segments.
- * This queue is global (across all realtime segments of all realtime/hybrid 
tables).
- *
- * Each element in the queue is of type {@link 
RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders}.
- * It encapsulates a lock and all the realtime lucene readers for the 
particular realtime segment.
- * Since text index is also create on a per column basis, there will be as 
many realtime lucene
- * readers as the number of columns with text search enabled.
- *
- * Between each successive execution of the task, there is a fixed delay 
(regardless of how long
- * each execution took). When the task wakes up, it pick the 
RealtimeLuceneReadersForRealtimeSegment
- * from the head of queue, refresh it's readers and adds this at the tail of 
queue.
- */
-public class RealtimeLuceneIndexReaderRefreshThread implements Runnable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeLuceneIndexReaderRefreshThread.class);
-  // TODO: make this configurable and choose a higher default value
-  private static final int DELAY_BETWEEN_SUCCESSIVE_EXECUTION_MS_DEFAULT = 10;
-
-  private final 
ConcurrentLinkedQueue<RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders> 
_luceneRealtimeReaders;
-  private final Lock _mutex;
-  private final Condition _conditionVariable;
-
-  private volatile boolean _stopped = false;
-
-  RealtimeLuceneIndexReaderRefreshThread(
-      
ConcurrentLinkedQueue<RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders> 
luceneRealtimeReaders, Lock mutex,
-      Condition conditionVariable) {
-    _luceneRealtimeReaders = luceneRealtimeReaders;
-    _mutex = mutex;
-    _conditionVariable = conditionVariable;
-  }
-
-  void setStopped() {
-    _stopped = true;
-  }
-
-  @Override
-  public void run() {
-    while (!_stopped) {
-      _mutex.lock();
-      try {
-        // During instantiation of a given MutableSegmentImpl, we will signal 
on this condition variable once
-        // one or more realtime lucene readers (one per column) belonging to 
the MutableSegment
-        // are added to the global queue managed by this thread. The thread 
that signals will
-        // grab this mutex and signal on the condition variable.
-        //
-        // This refresh thread will be woken up (and grab the mutex 
automatically as per the
-        // implementation of await) and check if the queue is non-empty. It 
will then proceed to
-        // poll the queue and refresh the realtime index readers for the 
polled segment.
-        //
-        // The mutex and condition-variable semantics take care of the 
scenario when on
-        // a given Pinot server, there is no realtime segment with text index 
enabled. In such
-        // cases, there is no need for this thread to wake up simply after 
every few seconds/minutes
-        // only to find that there is nothing to be refreshed. The thread 
should simply be
-        // off CPU until signalled specifically. This also covers the 
situation where initially
-        // there were few realtime segments of a table with text index. Later 
if they got
-        // moved to another server as part of rebalance, then again there is 
no need for this thread
-        // to do anything until some realtime segment is created with text 
index enabled.
-        while (_luceneRealtimeReaders.isEmpty()) {
-          _conditionVariable.await();
-        }
-      } catch (InterruptedException e) {
-        LOGGER.warn("Realtime lucene reader refresh thread got interrupted 
while waiting on condition variable: ", e);
-        Thread.currentThread().interrupt();
-      } finally {
-        _mutex.unlock();
-      }
-
-      // check if shutdown has been initiated
-      if (_stopped) {
-        // exit
-        break;
-      }
-
-      // remove the realtime segment from the front of queue
-      RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders 
realtimeReadersForSegment = _luceneRealtimeReaders.poll();
-      if (realtimeReadersForSegment != null) {
-        String segmentName = realtimeReadersForSegment.getSegmentName();
-        // take the lock to prevent the realtime segment from being 
concurrently destroyed
-        // and thus closing the realtime readers while this thread attempts to 
refresh them
-        realtimeReadersForSegment.getLock().lock();
-        try {
-          if (!realtimeReadersForSegment.isSegmentDestroyed()) {
-            // if the segment hasn't yet been destroyed, refresh each
-            // realtime reader (one per column with text index enabled)
-            // for this segment.
-            List<RealtimeLuceneTextIndex> realtimeLuceneReaders =
-                realtimeReadersForSegment.getRealtimeLuceneReaders();
-            for (RealtimeLuceneTextIndex realtimeReader : 
realtimeLuceneReaders) {
-              if (_stopped) {
-                // exit
-                break;
-              }
-              SearcherManager searcherManager = 
realtimeReader.getSearcherManager();
-              try {
-                searcherManager.maybeRefresh();
-              } catch (Exception e) {
-                // we should never be here since the locking semantics between 
MutableSegmentImpl::destroy()
-                // and this code along with volatile state 
"isSegmentDestroyed" protect against the cases
-                // where this thread might attempt to refresh a realtime 
lucene reader after it has already
-                // been closed duing segment destroy.
-                LOGGER.warn("Caught exception {} while refreshing realtime 
lucene reader for segment: {}", e,
-                    segmentName);
-              }
-            }
-          }
-        } finally {
-          if (!realtimeReadersForSegment.isSegmentDestroyed()) {
-            _luceneRealtimeReaders.offer(realtimeReadersForSegment);
-          }
-          realtimeReadersForSegment.getLock().unlock();
-        }
-      }
-
-      try {
-        Thread.sleep(DELAY_BETWEEN_SUCCESSIVE_EXECUTION_MS_DEFAULT);
-      } catch (Exception e) {
-        LOGGER.warn("Realtime lucene reader refresh thread got interrupted 
while sleeping: ", e);
-        Thread.currentThread().interrupt();
-      }
-    } // end while
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java
new file mode 100644
index 0000000000..84e2e982fc
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManager.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.pinot.common.utils.ScalingThreadPoolExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class manages the refreshing of all realtime Lucene index readers. It 
uses an auto-scaling pool of threads,
+ * which expands up to a configurable size, to refresh the readers.
+ * <p>
+ * During instantiation of a RealtimeLuceneTextIndex the corresponding 
SearcherManager is registered with this class.
+ * When the RealtimeLuceneTextIndex is closed, the flag set by the 
RealtimeLuceneTextIndex is checked before attempting
+ * a refresh, and SearcherManagerHolder instance previously registered to this 
class is dropped from the queue of
+ * readers to be refreshed.
+ */
+public class RealtimeLuceneIndexRefreshManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeLuceneIndexRefreshManager.class);
+  // max number of parallel refresh threads
+  private final int _maxParallelism;
+  // delay between refresh iterations
+  private int _delayMs;
+  // partitioned lists of SearcherManagerHolders, each gets its own thread for 
refreshing. SearcherManagerHolders
+  // are added to the list with the smallest size to roughly balance the load 
across threads
+  private final List<List<SearcherManagerHolder>> _partitionedListsOfSearchers;
+  private static RealtimeLuceneIndexRefreshManager _singletonInstance;
+  private static ExecutorService _executorService;
+
+  private RealtimeLuceneIndexRefreshManager(int maxParallelism, int delayMs) {
+    _maxParallelism = maxParallelism;
+    _delayMs = delayMs;
+    // Set min pool size to 0, scale up/down as needed. Set keep alive time to 
0, as threads are generally long-lived
+    _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, 
_maxParallelism, 0L);
+    _partitionedListsOfSearchers = new ArrayList<>();
+  }
+
+  public static RealtimeLuceneIndexRefreshManager getInstance() {
+    Preconditions.checkArgument(_singletonInstance != null,
+        "RealtimeLuceneIndexRefreshManager.init() must be called first");
+    return _singletonInstance;
+  }
+
+  /**
+   * Initializes the RealtimeLuceneIndexRefreshManager with the given 
maxParallelism and delayMs. This is
+   * intended to be called only once at the beginning of the server lifecycle.
+   * @param maxParallelism maximum number of refresh threads to use
+   * @param delayMs minimum delay between refreshes
+   */
+  public static RealtimeLuceneIndexRefreshManager init(int maxParallelism, int 
delayMs) {
+    _singletonInstance = new RealtimeLuceneIndexRefreshManager(maxParallelism, 
delayMs);
+    return _singletonInstance;
+  }
+
+  @VisibleForTesting
+  public void reset() {
+    _partitionedListsOfSearchers.clear();
+    _executorService.shutdownNow();
+    _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, 
_maxParallelism, 0L);
+  }
+
+  @VisibleForTesting
+  public void setDelayMs(int delayMs) {
+    _delayMs = delayMs;
+  }
+
+  /**
+   * Add a new SearcherManagerHolder and submit it to the executor service for 
refreshing.
+   * <p>
+   * If the _partitionedListsOfSearchers has less than _maxParallelism lists, 
a new list is created and submitted to
+   * the executor service to begin refreshing. If there are already 
_maxParallelism lists, the SearcherManagerHolder
+   * will be added to the list with the smallest size. If the smallest list is 
empty, it will be submitted to the
+   * executor as the old one was stopped.
+   * <p>
+   * The RealtimeLuceneRefreshRunnable will drop closed indexes from the list, 
and if all indexes are closed, the
+   * empty list will ensure the Runnable finishes. ScalingThreadPoolExecutor 
will then scale down the number of
+   * threads. This ensures that we do not leave any threads if there are no 
tables with text index, or text indices
+   * are removed on the server through actions such as config update or 
re-balance.
+   */
+  public synchronized void addSearcherManagerHolder(SearcherManagerHolder 
searcherManagerHolder) {
+    if (_partitionedListsOfSearchers.size() < _maxParallelism) {
+      List<SearcherManagerHolder> searcherManagers = 
Collections.synchronizedList(new ArrayList<>());
+      searcherManagers.add(searcherManagerHolder);
+      _partitionedListsOfSearchers.add(searcherManagers);
+      _executorService.submit(new 
RealtimeLuceneRefreshRunnable(searcherManagers, _delayMs));
+      return;
+    }
+
+    List<SearcherManagerHolder> smallestList = null;
+    for (List<SearcherManagerHolder> list : _partitionedListsOfSearchers) {
+      if (smallestList == null || list.size() < smallestList.size()) {
+        smallestList = list;
+      }
+    }
+    assert smallestList != null;
+    smallestList.add(searcherManagerHolder);
+
+    // If the list was empty before adding the SearcherManagerHolder, the 
runnable containing the list
+    // has exited or will soon exit. Therefore, we need to submit a new 
runnable to the executor for the list.
+    if (smallestList.size() == 1) {
+      _executorService.submit(new RealtimeLuceneRefreshRunnable(smallestList, 
_delayMs));
+    }
+  }
+
+  /**
+   * Blocks for up to 45 seconds waiting for refreshes of realtime Lucene 
index readers to complete.
+   * If all segments were previously closed, it should return immediately.
+   */
+  public boolean awaitTermination() {
+    // Interrupts will be handled by the RealtimeLuceneRefreshRunnable refresh 
loop. In general, all
+    // indexes should be marked closed before this method is called, and 
_executorService should
+    // shutdown immediately as there are no active threads. If for some reason 
an index did not close correctly,
+    // SearcherManager.maybeRefresh() should be on the order of seconds in the 
worst case and this should
+    // return shortly after.
+    _executorService.shutdownNow();
+    boolean terminated = false;
+    try {
+      terminated = _executorService.awaitTermination(45, TimeUnit.SECONDS);
+      if (!terminated) {
+        LOGGER.warn("Realtime Lucene index refresh pool did not terminate in 
45 seconds.");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("Interrupted while waiting for realtime Lucene index refresh 
to shutdown.");
+    }
+    return terminated;
+  }
+
+  @VisibleForTesting
+  public int getPoolSize() {
+    return ((ThreadPoolExecutor) _executorService).getPoolSize();
+  }
+
+  @VisibleForTesting
+  public List<Integer> getListSizes() {
+    return 
_partitionedListsOfSearchers.stream().map(List::size).sorted().collect(Collectors.toList());
+  }
+
+  /**
+   * SearcherManagerHolder is a class that holds a SearcherManager instance 
for a segment and column. Instances
+   * of this class should be registered with the 
RealtimeLuceneIndexRefreshManager class to manage refreshing of
+   * the SearcherManager instance it holds.
+   */
+  public static class SearcherManagerHolder {
+    private final String _segmentName;
+    private final String _columnName;
+    private final Lock _lock;
+    private volatile boolean _indexClosed;
+    private final SearcherManager _searcherManager;
+
+    public SearcherManagerHolder(String segmentName, String columnName, 
SearcherManager searcherManager) {
+      _segmentName = segmentName;
+      _columnName = columnName;
+      _lock = new ReentrantLock();
+      _indexClosed = false;
+      _searcherManager = searcherManager;
+    }
+
+    public void setIndexClosed() {
+      _indexClosed = true;
+    }
+
+    public Lock getLock() {
+      return _lock;
+    }
+
+    public String getSegmentName() {
+      return _segmentName;
+    }
+
+    public String getColumnName() {
+      return _columnName;
+    }
+
+    public SearcherManager getSearcherManager() {
+      return _searcherManager;
+    }
+
+    public boolean isIndexClosed() {
+      return _indexClosed;
+    }
+  }
+
+  /**
+   * Runnable that refreshes a list of SearcherManagerHolder instances. This 
class is responsible for refreshing
+   * each SearcherManagerHolder in the list, and re-adding it to the list if 
it has not been closed. If every
+   * instance has been closed, the thread will terminate as the list size will 
be empty.
+   */
+  private static class RealtimeLuceneRefreshRunnable implements Runnable {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeLuceneRefreshRunnable.class);
+    private final int _delayMs;
+    private final List<SearcherManagerHolder> _searchers;
+
+    public RealtimeLuceneRefreshRunnable(List<SearcherManagerHolder> 
searchers, int delayMs) {
+      _searchers = searchers;
+      _delayMs = delayMs;
+    }
+
+    @Override
+    public void run() {
+      int i = 0; // current index in _searchers
+      while (!_searchers.isEmpty() && i <= _searchers.size() && 
!Thread.interrupted()) {
+        if (i == _searchers.size()) {
+          i = 0; // reset cursor to the beginning if we've reached the end
+        }
+        SearcherManagerHolder searcherManagerHolder = _searchers.get(i);
+        assert searcherManagerHolder != null;
+        searcherManagerHolder.getLock().lock();
+        try {
+          if (searcherManagerHolder.isIndexClosed()) {
+            _searchers.remove(i);
+            continue; // do not increment i, as the remaining elements in the 
list have been shifted
+          }
+
+          if (!searcherManagerHolder.isIndexClosed()) {
+            try {
+              searcherManagerHolder.getSearcherManager().maybeRefresh();
+            } catch (Exception e) {
+              // we should never be here since the locking semantics between 
RealtimeLuceneTextIndex.close()
+              // and this code along with volatile state isIndexClosed protect 
against the cases where this thread
+              // might attempt to refresh a realtime lucene reader after it 
has already been closed during
+              // RealtimeLuceneTextIndex.commit()
+              LOGGER.warn("Caught exception {} while refreshing realtime 
lucene reader for segment: {} and column: {}",
+                  e, searcherManagerHolder.getSegmentName(), 
searcherManagerHolder.getColumnName());
+            }
+            i++;
+          }
+        } finally {
+          searcherManagerHolder.getLock().unlock();
+        }
+
+        try {
+          Thread.sleep(_delayMs);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
deleted file mode 100644
index e2330a6550..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshState.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.realtime.impl.invertedindex;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * This class manages the realtime lucene index readers. Creates a global
- * queue with all the realtime segment lucene index readers across
- * all tables and manages their refresh using {@link 
RealtimeLuceneIndexReaderRefreshThread}
- *
- * TODO: eventually we should explore partitioning this queue on per table 
basis
- */
-public class RealtimeLuceneIndexRefreshState {
-  private static RealtimeLuceneIndexRefreshState _singletonInstance;
-  private static RealtimeLuceneIndexReaderRefreshThread _realtimeRefreshThread;
-  private final Lock _mutex;
-  private final Condition _conditionVariable;
-  private static ConcurrentLinkedQueue<RealtimeLuceneReaders> 
_luceneRealtimeReaders;
-
-  private RealtimeLuceneIndexRefreshState() {
-    _mutex = new ReentrantLock();
-    _conditionVariable = _mutex.newCondition();
-    _luceneRealtimeReaders = new ConcurrentLinkedQueue<>();
-  }
-
-  /**
-   * Used by HelixServerStarter during bootstrap to create the singleton
-   * instance of this class and start the realtime reader refresh thread.
-   */
-  public void start() {
-    _realtimeRefreshThread =
-        new RealtimeLuceneIndexReaderRefreshThread(_luceneRealtimeReaders, 
_mutex, _conditionVariable);
-    Thread t = new Thread(_realtimeRefreshThread);
-    t.start();
-  }
-
-  /**
-   * Used by HelixServerStarter during shutdown. This sets the volatile
-   * "stopped" variable to indicate the shutdown to refresh thread.
-   * Since refresh thread might be suspended waiting on the condition variable,
-   * we signal the condition variable for the refresh thread to wake up,
-   * check that shutdown has been initiated and exit.
-   */
-  public void stop() {
-    _realtimeRefreshThread.setStopped();
-    _mutex.lock();
-    _conditionVariable.signal();
-    _mutex.unlock();
-  }
-
-  public static RealtimeLuceneIndexRefreshState getInstance() {
-    if (_singletonInstance == null) {
-      synchronized (RealtimeLuceneIndexRefreshState.class) {
-        if (_singletonInstance == null) {
-          _singletonInstance = new RealtimeLuceneIndexRefreshState();
-        }
-      }
-    }
-    return _singletonInstance;
-  }
-
-  public void addRealtimeReadersToQueue(RealtimeLuceneReaders 
readersForRealtimeSegment) {
-    _mutex.lock();
-    _luceneRealtimeReaders.offer(readersForRealtimeSegment);
-    _conditionVariable.signal();
-    _mutex.unlock();
-  }
-
-  /**
-   * Since the text index is maintained per TEXT column (similar to other 
Pinot indexes),
-   * there could be multiple lucene indexes for a given segment and therefore 
there can be
-   * multiple realtime lucene readers (one for each index/column) for the 
particular
-   * realtime segment.
-   */
-  public static class RealtimeLuceneReaders {
-    private final String _segmentName;
-    private final Lock _lock;
-    private boolean _segmentDestroyed;
-    private final List<RealtimeLuceneTextIndex> _realtimeLuceneReaders;
-
-    public RealtimeLuceneReaders(String segmentName) {
-      _segmentName = segmentName;
-      _lock = new ReentrantLock();
-      _segmentDestroyed = false;
-      _realtimeLuceneReaders = new LinkedList<>();
-    }
-
-    public void addReader(RealtimeLuceneTextIndex 
realtimeLuceneTextIndexReader) {
-      _realtimeLuceneReaders.add(realtimeLuceneTextIndexReader);
-    }
-
-    public void setSegmentDestroyed() {
-      _segmentDestroyed = true;
-    }
-
-    public Lock getLock() {
-      return _lock;
-    }
-
-    public String getSegmentName() {
-      return _segmentName;
-    }
-
-    public List<RealtimeLuceneTextIndex> getRealtimeLuceneReaders() {
-      return _realtimeLuceneReaders;
-    }
-
-    public void clearRealtimeReaderList() {
-      _realtimeLuceneReaders.clear();
-    }
-
-    boolean isSegmentDestroyed() {
-      return _segmentDestroyed;
-    }
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index 0175bc5109..961f028629 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -58,6 +58,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
   private final boolean _reuseMutableIndex;
   private boolean _enablePrefixSuffixMatchingInPhraseQueries = false;
   private final RealtimeLuceneRefreshListener _refreshListener;
+  private final RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
_searcherManagerHolder;
 
   /**
    * Created by {@link MutableSegmentImpl}
@@ -92,6 +93,11 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       _analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
       _enablePrefixSuffixMatchingInPhraseQueries = 
config.isEnablePrefixSuffixMatchingInPhraseQueries();
       _reuseMutableIndex = config.isReuseMutableIndex();
+
+      // Submit the searcher manager to the global pool for refreshing
+      _searcherManagerHolder =
+          new 
RealtimeLuceneIndexRefreshManager.SearcherManagerHolder(segmentName, column, 
_searcherManager);
+      
RealtimeLuceneIndexRefreshManager.getInstance().addSearcherManagerHolder(_searcherManagerHolder);
     } catch (Exception e) {
       LOGGER.error("Failed to instantiate realtime Lucene index reader for 
column {}, exception {}", column,
           e.getMessage());
@@ -179,7 +185,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       while (luceneDocIDIterator.hasNext()) {
         int luceneDocId = luceneDocIDIterator.next();
         Document document = indexSearcher.doc(luceneDocId);
-        int pinotDocId = 
Integer.valueOf(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME));
+        int pinotDocId = 
Integer.parseInt(document.get(LuceneTextIndexCreator.LUCENE_INDEX_DOC_ID_COLUMN_NAME));
         actualDocIDs.add(pinotDocId);
       }
     } catch (Exception e) {
@@ -196,6 +202,18 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
     }
     try {
       _indexCreator.getIndexWriter().commit();
+      // Set the SearcherManagerHolder.indexClosed() flag to stop generating 
refreshed readers
+      _searcherManagerHolder.getLock().lock();
+      try {
+        _searcherManagerHolder.setIndexClosed();
+        // Block for one final refresh, to ensure queries are fully up to date 
while segment is being converted
+        _searcherManager.maybeRefreshBlocking();
+      } finally {
+        _searcherManagerHolder.getLock().unlock();
+      }
+      // It is OK to close the index writer as we are done indexing, and no 
more refreshes will take place
+      // The SearcherManager will still provide an up-to-date reader via 
.acquire()
+      _indexCreator.getIndexWriter().close();
     } catch (Exception e) {
       LOGGER.error("Failed to commit the realtime lucene text index for column 
{}, exception {}", _column,
           e.getMessage());
@@ -206,6 +224,14 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
   @Override
   public void close() {
     try {
+      // Set the SearcherManagerHolder.indexClosed() flag to stop generating 
refreshed readers. If completionMode is
+      // set as DOWNLOAD, then commit() will not be called the flag must be 
set here.
+      _searcherManagerHolder.getLock().lock();
+      try {
+        _searcherManagerHolder.setIndexClosed();
+      } finally {
+        _searcherManagerHolder.getLock().unlock();
+      }
       _searcherManager.close();
       _searcherManager = null;
       _refreshListener.close(); // clean up metrics prior to closing 
_indexCreator, as they contain a reference to it
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
index fc2a9ae151..a5262b489f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java
@@ -36,11 +36,11 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.NoMergeScheduler;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.NRTCachingDirectory;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.LuceneNRTCachingMergePolicy;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.text.AbstractTextIndexCreator;
@@ -132,19 +132,12 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
       indexWriterConfig.setCommitOnClose(commit);
       indexWriterConfig.setUseCompoundFile(config.isLuceneUseCompoundFile());
 
-      // For the realtime segment, to reuse mutable index, we should set the 
two write configs below.
-      // The realtime segment will call .commit() on the IndexWriter when 
segment conversion occurs.
-      // By default, Lucene will sometimes choose to merge segments in the 
background, which is problematic because
-      // the lucene index directory's contents is copied to create the 
immutable segment. If a background merge
-      // occurs during this copy, a FileNotFoundException will be triggered 
and segment build will fail.
-      //
       // Also, for the realtime segment, we set the OpenMode to CREATE to 
ensure that any existing artifacts
       // will be overwritten. This is necessary because the realtime segment 
can be created multiple times
       // during a server crash and restart scenario. If the existing artifacts 
are appended to, the realtime
       // query results will be accurate, but after segment conversion the 
mapping file generated will be loaded
       // for only the first numDocs lucene docIds, which can cause 
IndexOutOfBounds errors.
       if (!_commitOnClose && config.isReuseMutableIndex()) {
-        indexWriterConfig.setMergeScheduler(NoMergeScheduler.INSTANCE);
         indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
       }
 
@@ -166,7 +159,9 @@ public class LuceneTextIndexCreator extends 
AbstractTextIndexCreator {
         LOGGER.info(
             "Using NRTCachingDirectory for realtime lucene index for segment 
{} and column {} with buffer size: {}MB",
             segmentIndexDir, column, bufSize);
-        _indexDirectory = new 
NRTCachingDirectory(FSDirectory.open(_indexFile.toPath()), bufSize, bufSize);
+        NRTCachingDirectory dir = new 
NRTCachingDirectory(FSDirectory.open(_indexFile.toPath()), bufSize, bufSize);
+        indexWriterConfig.setMergePolicy(new LuceneNRTCachingMergePolicy(dir));
+        _indexDirectory = dir;
       } else {
         _indexDirectory = FSDirectory.open(_indexFile.toPath());
       }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
index 390a2d158e..84254060b3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
 import 
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -495,6 +496,7 @@ public class RealtimeSegmentConverterTest {
 
     // create mutable segment impl
     RealtimeLuceneTextIndexSearcherPool.init(1);
+    RealtimeLuceneIndexRefreshManager.init(1, 10);
     MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
     List<GenericRow> rows = generateTestDataForReusePath();
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
index 324cc1fa5d..cc46486606 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
@@ -19,14 +19,15 @@
 package org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
-import org.apache.lucene.search.SearcherManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.apache.pinot.util.TestUtils;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
@@ -46,6 +47,7 @@ public class LuceneMutableTextIndexTest {
   private RealtimeLuceneTextIndex _realtimeLuceneTextIndex;
 
   public LuceneMutableTextIndexTest() {
+    RealtimeLuceneIndexRefreshManager.init(1, 10);
     ServerMetrics.register(mock(ServerMetrics.class));
   }
 
@@ -80,13 +82,6 @@ public class LuceneMutableTextIndexTest {
         _realtimeLuceneTextIndex.add(row);
       }
     }
-
-    SearcherManager searcherManager = 
_realtimeLuceneTextIndex.getSearcherManager();
-    try {
-      searcherManager.maybeRefresh();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
   }
 
   @AfterClass
@@ -96,6 +91,14 @@ public class LuceneMutableTextIndexTest {
 
   @Test
   public void testQueries() {
+    TestUtils.waitForCondition(aVoid -> {
+          try {
+            return 
_realtimeLuceneTextIndex.getSearcherManager().isSearcherCurrent();
+          } catch (IOException e) {
+            return false;
+          }
+        }, 10000,
+        "Background pool did not refresh the searcher manager in time");
     assertEquals(_realtimeLuceneTextIndex.getDocIds("stream"), 
ImmutableRoaringBitmap.bitmapOf(0));
     assertEquals(_realtimeLuceneTextIndex.getDocIds("/.*house.*/"), 
ImmutableRoaringBitmap.bitmapOf(1));
     assertEquals(_realtimeLuceneTextIndex.getDocIds("invalid"), 
ImmutableRoaringBitmap.bitmapOf());
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
index 932c851be0..84765353b0 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
@@ -73,6 +73,7 @@ public class NativeAndLuceneMutableTextIndexTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    RealtimeLuceneIndexRefreshManager.init(1, 10);
     ServerMetrics.register(mock(ServerMetrics.class));
     TextIndexConfig config =
         new TextIndexConfig(false, null, null, false, false, null, null, true, 
500, null, false, false, 0);
@@ -101,7 +102,7 @@ public class NativeAndLuceneMutableTextIndexTest {
     searcherManagers.add(_realtimeLuceneMVTextIndex.getSearcherManager());
     try {
       for (SearcherManager searcherManager : searcherManagers) {
-        searcherManager.maybeRefresh();
+        searcherManager.maybeRefreshBlocking();
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java
new file mode 100644
index 0000000000..351ea336bb
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexRefreshManagerTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class RealtimeLuceneIndexRefreshManagerTest {
+
+  @BeforeClass
+  public void init() {
+    RealtimeLuceneIndexRefreshManager.init(2, 0);
+  }
+
+  @BeforeMethod
+  public void setup() {
+    RealtimeLuceneIndexRefreshManager.getInstance().reset();
+  }
+
+  @Test
+  public void testSingleSearcherManager() {
+    RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager =
+        RealtimeLuceneIndexRefreshManager.getInstance();
+
+    assertEquals(realtimeLuceneIndexRefreshManager.getPoolSize(), 0, "Initial 
pool size should be 0");
+
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder = getSearcherManagerHolder(1);
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder);
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 1, 5000,
+        "Timed out waiting for thead pool to scale up for the initial searcher 
manager holder");
+
+    searcherManagerHolder.setIndexClosed();
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 0, 5000,
+        "Timed out waiting for thread pool to scale down");
+  }
+
+  @Test
+  public void testManySearcherManagers() {
+    RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager =
+        RealtimeLuceneIndexRefreshManager.getInstance();
+
+    assertEquals(realtimeLuceneIndexRefreshManager.getPoolSize(), 0, "Initial 
pool size should be 0");
+
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder1 = getSearcherManagerHolder(1);
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder2 = getSearcherManagerHolder(2);
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder3 = getSearcherManagerHolder(3);
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder4 = getSearcherManagerHolder(4);
+
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder1);
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 1, 5000,
+        "Timed out waiting for thead pool to scale up for the initial searcher 
manager holder");
+
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder2);
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 2, 5000,
+        "Timed out waiting for thead pool to scale up for the second searcher 
manager holder");
+
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder3);
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getListSizes().equals(List.of(1, 2)), 5000,
+        "Timed out waiting for the searcher manager holder to be added to 
another queue");
+
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder4);
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getListSizes().equals(List.of(2, 2)), 5000,
+        "Timed out waiting for the searcher manager holder to be added to the 
smallest queue");
+
+    searcherManagerHolder1.setIndexClosed();
+    searcherManagerHolder2.setIndexClosed();
+    searcherManagerHolder3.setIndexClosed();
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 1, 5000,
+        "Timed out waiting for thead pool to scale down as only one searcher 
manager holder is left");
+
+    searcherManagerHolder4.setIndexClosed();
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 0, 5000,
+        "Timed out waiting for thead pool to scale down as all searcher 
manager holders have been closed");
+  }
+
+  @Test
+  public void testDelayMs()
+      throws Exception {
+    RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager =
+        RealtimeLuceneIndexRefreshManager.getInstance();
+    realtimeLuceneIndexRefreshManager.setDelayMs(501);
+
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder = getSearcherManagerHolder(1);
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder);
+
+    Thread.sleep(1500);
+    searcherManagerHolder.setIndexClosed();
+
+    SearcherManager searcherManager = 
searcherManagerHolder.getSearcherManager();
+    verify(searcherManager, times(3)).maybeRefresh();
+  }
+
+  @Test
+  public void testTerminationForEmptyPool() {
+    
assertTrue(RealtimeLuceneIndexRefreshManager.getInstance().awaitTermination());
+  }
+
+  @Test
+  public void testTerminationWhileActive() {
+    RealtimeLuceneIndexRefreshManager realtimeLuceneIndexRefreshManager =
+        RealtimeLuceneIndexRefreshManager.getInstance();
+
+    assertEquals(realtimeLuceneIndexRefreshManager.getPoolSize(), 0, "Initial 
pool size should be 0");
+
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder1 = getSearcherManagerHolder(1);
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder2 = getSearcherManagerHolder(2);
+    RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
searcherManagerHolder3 = getSearcherManagerHolder(3);
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder1);
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder2);
+    
realtimeLuceneIndexRefreshManager.addSearcherManagerHolder(searcherManagerHolder3);
+
+    TestUtils.waitForCondition(aVoid -> 
realtimeLuceneIndexRefreshManager.getPoolSize() == 2, 10000,
+        "Timed out waiting for thread pool to scale up");
+
+    searcherManagerHolder1.setIndexClosed();
+    searcherManagerHolder2.setIndexClosed();
+    searcherManagerHolder3.setIndexClosed();
+    
assertTrue(RealtimeLuceneIndexRefreshManager.getInstance().awaitTermination());
+  }
+
+  private RealtimeLuceneIndexRefreshManager.SearcherManagerHolder 
getSearcherManagerHolder(int id) {
+    SearcherManager searcherManager = mock(SearcherManager.class);
+    try {
+      when(searcherManager.maybeRefresh()).thenReturn(true);
+    } catch (IOException e) {
+      Assert.fail("Mocked searcher manager should not throw exception");
+    }
+    return new 
RealtimeLuceneIndexRefreshManager.SearcherManagerHolder("segment" + id, "col", 
searcherManager);
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index b961582a9e..54e118634f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -72,7 +72,7 @@ import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManage
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
-import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.server.access.AccessControlFactory;
@@ -142,8 +142,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected AccessControlFactory _accessControlFactory;
   protected AdminApiApplication _adminApiApplication;
   protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker;
-  protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState;
   protected RealtimeLuceneTextIndexSearcherPool 
_realtimeLuceneTextIndexSearcherPool;
+  protected RealtimeLuceneIndexRefreshManager 
_realtimeLuceneTextIndexRefreshManager;
   protected PinotEnvironmentProvider _pinotEnvironmentProvider;
   protected volatile boolean _isServerReadyToServeQueries = false;
 
@@ -587,6 +587,15 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         _serverConf.getProperty(ResourceManager.QUERY_WORKER_CONFIG_KEY, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
     _realtimeLuceneTextIndexSearcherPool = 
RealtimeLuceneTextIndexSearcherPool.init(queryWorkerThreads);
 
+    // Initialize RealtimeLuceneIndexRefreshManager with max refresh threads 
and min refresh interval configs
+    LOGGER.info("Initializing lucene refresh manager");
+    int luceneMaxRefreshThreads =
+        _serverConf.getProperty(Server.LUCENE_MAX_REFRESH_THREADS, 
Server.DEFAULT_LUCENE_MAX_REFRESH_THREADS);
+    int luceneMinRefreshIntervalDuration =
+        _serverConf.getProperty(Server.LUCENE_MIN_REFRESH_INTERVAL_MS, 
Server.DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS);
+    _realtimeLuceneTextIndexRefreshManager =
+        RealtimeLuceneIndexRefreshManager.init(luceneMaxRefreshThreads, 
luceneMinRefreshIntervalDuration);
+
     LOGGER.info("Initializing server instance and registering state model 
factory");
     Utils.logVersions();
     ControllerLeaderLocator.create(_helixManager);
@@ -684,9 +693,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _serverQueriesDisabledTracker =
         new ServerQueriesDisabledTracker(_helixClusterName, _instanceId, 
_helixManager, serverMetrics);
     _serverQueriesDisabledTracker.start();
-
-    _realtimeLuceneIndexRefreshState = 
RealtimeLuceneIndexRefreshState.getInstance();
-    _realtimeLuceneIndexRefreshState.start();
   }
 
   /**
@@ -719,9 +725,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     if (_serverQueriesDisabledTracker != null) {
       _serverQueriesDisabledTracker.stop();
     }
-    if (_realtimeLuceneIndexRefreshState != null) {
-      _realtimeLuceneIndexRefreshState.stop();
-    }
     try {
       // Close PinotFS after all data managers are shutdown. Otherwise, 
segments which are being committed will not
       // be uploaded to the deep-store.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b6fec05b31..6abdec73e0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -726,6 +726,11 @@ public class CommonConstants {
 
     public static final String 
CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX = 
"consumer.client.id.suffix";
 
+    public static final String LUCENE_MAX_REFRESH_THREADS = 
"pinot.server.lucene.max.refresh.threads";
+    public static final int DEFAULT_LUCENE_MAX_REFRESH_THREADS = 1;
+    public static final String LUCENE_MIN_REFRESH_INTERVAL_MS = 
"pinot.server.lucene.min.refresh.interval.ms";
+    public static final int DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS = 10;
+
     public static class SegmentCompletionProtocol {
       public static final String PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER = 
"pinot.server.segment.uploader";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to