This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 211cf8a Fix the race condition in realtime text index refresh thread (#6858) (#6990) 211cf8a is described below commit 211cf8a8b9476a9756d2b40f1d0660cfcb1d4235 Author: Sharayu <gandhi16.shar...@gmail.com> AuthorDate: Thu May 27 23:47:02 2021 -0700 Fix the race condition in realtime text index refresh thread (#6858) (#6990) --- .../RealtimeLuceneIndexReaderRefreshThread.java | 52 +++++++++++----------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java index 1377b5e..4ea84a1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneIndexReaderRefreshThread.java @@ -66,34 +66,34 @@ public class RealtimeLuceneIndexReaderRefreshThread implements Runnable { @Override public void run() { while (!_stopped) { - while (_luceneRealtimeReaders.isEmpty()) { - _mutex.lock(); - try { - // During instantiation of a given MutableSegmentImpl, we will signal on this condition variable once - // one or more realtime lucene readers (one per column) belonging to the MutableSegment - // are added to the global queue managed by this thread. The thread that signals will - // grab this mutex and signal on the condition variable. - // - // This refresh thread will be woken up (and grab the mutex automatically as per the - // implementation of await) and check if the queue is non-empty. It will then proceed to - // poll the queue and refresh the realtime index readers for the polled segment. - // - // The mutex and condition-variable semantics take care of the scenario when on - // a given Pinot server, there is no realtime segment with text index enabled. In such - // cases, there is no need for this thread to wake up simply after every few seconds/minutes - // only to find that there is nothing to be refreshed. The thread should simply be - // off CPU until signalled specifically. This also covers the situation where initially - // there were few realtime segments of a table with text index. Later if they got - // moved to another server as part of rebalance, then again there is no need for this thread - // to do anything until some realtime segment is created with text index enabled. + _mutex.lock(); + try { + // During instantiation of a given MutableSegmentImpl, we will signal on this condition variable once + // one or more realtime lucene readers (one per column) belonging to the MutableSegment + // are added to the global queue managed by this thread. The thread that signals will + // grab this mutex and signal on the condition variable. + // + // This refresh thread will be woken up (and grab the mutex automatically as per the + // implementation of await) and check if the queue is non-empty. It will then proceed to + // poll the queue and refresh the realtime index readers for the polled segment. + // + // The mutex and condition-variable semantics take care of the scenario when on + // a given Pinot server, there is no realtime segment with text index enabled. In such + // cases, there is no need for this thread to wake up simply after every few seconds/minutes + // only to find that there is nothing to be refreshed. The thread should simply be + // off CPU until signalled specifically. This also covers the situation where initially + // there were few realtime segments of a table with text index. Later if they got + // moved to another server as part of rebalance, then again there is no need for this thread + // to do anything until some realtime segment is created with text index enabled. + while (_luceneRealtimeReaders.isEmpty()) { _conditionVariable.await(); - } catch (InterruptedException e) { - LOGGER.warn("Realtime lucene reader refresh thread got interrupted while waiting on condition variable: ", e); - Thread.currentThread().interrupt(); - } finally { - _mutex.unlock(); } - } // end while + } catch (InterruptedException e) { + LOGGER.warn("Realtime lucene reader refresh thread got interrupted while waiting on condition variable: ", e); + Thread.currentThread().interrupt(); + } finally { + _mutex.unlock(); + } // check if shutdown has been initiated if (_stopped) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org