This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 497d0517ab Introduce Real Time Native Text Index (#8636)
497d0517ab is described below

commit 497d0517abfe62f7a95f9eb0602a2c5fa07dc531
Author: Atri Sharma <atri.j...@gmail.com>
AuthorDate: Tue May 10 22:16:11 2022 +0530

    Introduce Real Time Native Text Index (#8636)
    
    This commit introduces the real time native text index. The text index is 
real time i.e. as data comes in, it is available for search.
    There is no refresh required. The index works with TEXT_CONTAINS.
    
    * Update 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeNativeTextIndex.java
    
    Co-authored-by: Xiaotian (Jackie) Jiang 
<17555551+jackie-ji...@users.noreply.github.com>
    
    * Rebase
    
    * Checkstyle fixes
    
    * Add missing check
    
    * More review comments
    
    * Update tests
    
    * Move to reusable analyzer and refactor current realtime test
    
    * More comments and checkstyle
    
    * Add Missing File
    
    * Update per comments
    
    Co-authored-by: Atri Sharma <securonix@securonixs-MacBook-Pro.local>
    Co-authored-by: Xiaotian (Jackie) Jiang 
<17555551+jackie-ji...@users.noreply.github.com>
    Co-authored-by: Atri Sharma <securonix@securonixs-MacBook-Pro-3.local>
---
 .../org/apache/pinot/core/plan/FilterPlanNode.java |   9 +-
 ...TextIndicesRealtimeClusterIntegrationTest.java} |  50 +++-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  25 +-
 .../impl/invertedindex/NativeMutableTextIndex.java | 123 ++++++++++
 .../invertedindex/RealtimeLuceneTextIndex.java     |   2 +-
 .../NativeAndLuceneMutableTextIndexTest.java       | 107 +++++++++
 .../NativeMutableTextIndexConcurrentTest.java      | 251 +++++++++++++++++++++
 .../NativeMutableTextIndexReaderWriterTest.java    |  63 ++++++
 .../mutablefst/MutableFSTConcurrentTest.java       |  74 ++++--
 9 files changed, 664 insertions(+), 40 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 6260085d96..7bc1a4f642 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -49,6 +49,7 @@ import 
org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEv
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex;
 import 
org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -228,7 +229,7 @@ public class FilterPlanNode implements PlanNode {
         ExpressionContext lhs = predicate.getLhs();
         if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
           if (canApplyH3IndexForDistanceCheck(predicate, lhs.getFunction())) {
-              return new H3IndexFilterOperator(_indexSegment, predicate, 
numDocs);
+            return new H3IndexFilterOperator(_indexSegment, predicate, 
numDocs);
           } else if (canApplyH3IndexForInclusionCheck(predicate, 
lhs.getFunction())) {
             return new H3InclusionIndexFilterOperator(_indexSegment, 
predicate, numDocs);
           } else {
@@ -246,14 +247,16 @@ public class FilterPlanNode implements PlanNode {
           switch (predicate.getType()) {
             case TEXT_CONTAINS:
               TextIndexReader textIndexReader = dataSource.getTextIndex();
-              if (!(textIndexReader instanceof NativeTextIndexReader)) {
+              if (!(textIndexReader instanceof NativeTextIndexReader)
+                  && !(textIndexReader instanceof NativeMutableTextIndex)) {
                 throw new UnsupportedOperationException("TEXT_CONTAINS is 
supported only on native text index");
               }
               return new TextContainsFilterOperator(textIndexReader, 
(TextContainsPredicate) predicate, numDocs);
             case TEXT_MATCH:
               textIndexReader = dataSource.getTextIndex();
               // We could check for real time and segment Lucene reader, but 
easier to check the other way round
-              if (textIndexReader instanceof NativeTextIndexReader) {
+              if (textIndexReader instanceof NativeTextIndexReader
+                  || textIndexReader instanceof NativeMutableTextIndex) {
                 throw new UnsupportedOperationException("TEXT_MATCH is not 
supported on native text index");
               }
               return new TextMatchFilterOperator(textIndexReader, 
(TextMatchPredicate) predicate, numDocs);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java
similarity index 75%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java
index fad182f957..7af80d0363 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java
@@ -25,7 +25,9 @@ import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
@@ -46,19 +48,25 @@ import static org.testng.AssertJUnit.fail;
 
 
 /**
- * Cluster integration test for near realtime text search
+ * Cluster integration test for near realtime text search (lucene) and 
realtime text search (native).
  */
-public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+public class TextIndicesRealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   private static final String TEXT_COLUMN_NAME = "skills";
+  private static final String TEXT_COLUMN_NAME_NATIVE = "skills_native";
   private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
   private static final int NUM_SKILLS = 24;
   private static final int NUM_MATCHING_SKILLS = 4;
   private static final int NUM_RECORDS = NUM_SKILLS * 1000;
   private static final int NUM_MATCHING_RECORDS = NUM_MATCHING_SKILLS * 1000;
+  private static final int NUM_MATCHING_RECORDS_NATIVE = 7000;
 
   private static final String TEST_TEXT_COLUMN_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE TEXT_MATCH(skills, '\"machine 
learning\" AND spark')";
 
+  private static final String TEST_TEXT_COLUMN_QUERY_NATIVE =
+      "SELECT COUNT(*) FROM mytable WHERE TEXT_CONTAINS(skills_native, 'm.*') 
AND TEXT_CONTAINS(skills_native, "
+          + "'spark')";
+
   @Override
   public String getTimeColumnName() {
     return TIME_COLUMN_NAME;
@@ -79,7 +87,7 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
   @Nullable
   @Override
   protected List<String> getInvertedIndexColumns() {
-    return null;
+    return Collections.singletonList(TEXT_COLUMN_NAME_NATIVE);
   }
 
   @Override
@@ -101,8 +109,13 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
 
   @Override
   protected List<FieldConfig> getFieldConfigs() {
-    return Collections.singletonList(
-        new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, 
FieldConfig.IndexType.TEXT, null, null));
+    Map<String, String> propertiesMap = new HashMap<>();
+    propertiesMap.put(FieldConfig.TEXT_FST_TYPE, 
FieldConfig.TEXT_NATIVE_FST_LITERAL);
+
+    return Arrays.asList(
+        new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, 
FieldConfig.IndexType.TEXT, null, null),
+        new FieldConfig(TEXT_COLUMN_NAME_NATIVE, FieldConfig.EncodingType.RAW, 
FieldConfig.IndexType.TEXT, null,
+            propertiesMap));
   }
 
   @BeforeClass
@@ -125,6 +138,7 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
     // Create and upload the schema and table config
     Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
         .addSingleValueDimension(TEXT_COLUMN_NAME, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(TEXT_COLUMN_NAME_NATIVE, 
FieldSpec.DataType.STRING)
         .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
     addSchema(schema);
     addTableConfig(createRealtimeTableConfig(avroFile));
@@ -172,6 +186,8 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
     avroSchema.setFields(Arrays.asList(new 
org.apache.avro.Schema.Field(TEXT_COLUMN_NAME,
             org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), 
null, null),
+        new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME_NATIVE,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), 
null, null),
         new org.apache.avro.Schema.Field(TIME_COLUMN_NAME,
             org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), 
null, null)));
     try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
@@ -179,6 +195,7 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
       for (int i = 0; i < NUM_RECORDS; i++) {
         GenericData.Record record = new GenericData.Record(avroSchema);
         record.put(TEXT_COLUMN_NAME, skills.get(i % NUM_SKILLS));
+        record.put(TEXT_COLUMN_NAME_NATIVE, skills.get(i % NUM_SKILLS));
         record.put(TIME_COLUMN_NAME, System.currentTimeMillis());
         fileWriter.append(record);
       }
@@ -192,7 +209,7 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
     // Keep posting queries until all records are consumed
     long previousResult = 0;
     while (getCurrentCountStarResult() < NUM_RECORDS) {
-      long result = getTextColumnQueryResult();
+      long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY);
       assertTrue(result >= previousResult);
       previousResult = result;
       Thread.sleep(100);
@@ -201,7 +218,7 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
     //Lucene index on consuming segments to update the latest records
     TestUtils.waitForCondition(aVoid -> {
       try {
-        return getTextColumnQueryResult() == NUM_MATCHING_RECORDS;
+        return getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY) == 
NUM_MATCHING_RECORDS;
       } catch (Exception e) {
         fail("Caught exception while getting text column query result");
         return false;
@@ -209,8 +226,23 @@ public class LuceneRealtimeClusterIntegrationTest extends 
BaseClusterIntegration
     }, 10_000L, "Failed to reach expected number of matching records");
   }
 
-  private long getTextColumnQueryResult()
+  @Test
+  public void testTextSearchCountQueryNative()
+      throws Exception {
+    // Keep posting queries until all records are consumed
+    long previousResult = 0;
+    while (getCurrentCountStarResult() < NUM_RECORDS) {
+      long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE);
+      assertTrue(result >= previousResult);
+      previousResult = result;
+      Thread.sleep(100);
+    }
+
+    assertTrue(getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE) == 
NUM_MATCHING_RECORDS_NATIVE);
+  }
+
+  private long getTextColumnQueryResult(String query)
       throws Exception {
-    return 
postQuery(TEST_TEXT_COLUMN_QUERY).get("resultTable").get("rows").get(0).get(0).asLong();
+    return 
postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index d88389eeff..ecab67ed5d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
 import org.apache.pinot.segment.local.realtime.impl.geospatial.MutableH3Index;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector;
@@ -300,20 +301,20 @@ public class MutableSegmentImpl implements MutableSegment 
{
         }
 
         if (useNativeTextIndex) {
-          // TODO: Add native text index support
-          _logger.warn("Mutable native text index is not supported, falling 
back to the Lucene text index");
-        }
+          textIndex = new NativeMutableTextIndex(column);
+        } else {
 
-        // TODO - this logic is in the wrong place and belongs in a 
Lucene-specific submodule,
-        //  it is beyond the scope of realtime index pluggability to do this 
refactoring, so realtime
-        //  text indexes remain statically defined. Revisit this after this 
refactoring has been done.
-        RealtimeLuceneTextIndex luceneTextIndex =
-            new RealtimeLuceneTextIndex(column, new 
File(config.getConsumerDir()), _segmentName);
-        if (_realtimeLuceneReaders == null) {
-          _realtimeLuceneReaders = new 
RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
+          // TODO - this logic is in the wrong place and belongs in a 
Lucene-specific submodule,
+          //  it is beyond the scope of realtime index pluggability to do this 
refactoring, so realtime
+          //  text indexes remain statically defined. Revisit this after this 
refactoring has been done.
+          RealtimeLuceneTextIndex luceneTextIndex =
+              new RealtimeLuceneTextIndex(column, new 
File(config.getConsumerDir()), _segmentName);
+          if (_realtimeLuceneReaders == null) {
+            _realtimeLuceneReaders = new 
RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
+          }
+          _realtimeLuceneReaders.addReader(luceneTextIndex);
+          textIndex = luceneTextIndex;
         }
-        _realtimeLuceneReaders.addReader(luceneTextIndex);
-        textIndex = luceneTextIndex;
       } else {
         textIndex = null;
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java
new file mode 100644
index 0000000000..1f27f57b5a
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST;
+import 
org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl;
+import 
org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher;
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class NativeMutableTextIndex implements MutableTextIndex {
+  private final String _column;
+  private final MutableFST _mutableFST;
+  private final RealtimeInvertedIndex _invertedIndex;
+  //TODO: Move to mutable dictionary
+  private final Object2IntOpenHashMap<String> _termToDictIdMapping;
+  private final ReentrantReadWriteLock.ReadLock _readLock;
+  private final ReentrantReadWriteLock.WriteLock _writeLock;
+  private final Analyzer _analyzer;
+
+  private int _nextDocId = 0;
+  private int _nextDictId = 0;
+
+  public NativeMutableTextIndex(String column) {
+    _column = column;
+    _mutableFST = new MutableFSTImpl();
+    _termToDictIdMapping = new Object2IntOpenHashMap<>();
+    _invertedIndex = new RealtimeInvertedIndex();
+
+    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    _readLock = readWriteLock.readLock();
+    _writeLock = readWriteLock.writeLock();
+    _analyzer = new 
StandardAnalyzer(LuceneTextIndexCreator.ENGLISH_STOP_WORDS_SET);
+  }
+
+  @Override
+  public void add(String document) {
+    Iterable<String> tokens;
+
+    tokens = analyze(document);
+    _writeLock.lock();
+    try {
+      for (String token : tokens) {
+        Integer currentDictId = _termToDictIdMapping.computeIntIfAbsent(token, 
k -> {
+          int localDictId = _nextDictId++;
+          _mutableFST.addPath(token, localDictId);
+          return localDictId;
+        });
+        _invertedIndex.add(currentDictId, _nextDocId);
+      }
+      _nextDocId++;
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap getDictIds(String searchQuery) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public MutableRoaringBitmap getDocIds(String searchQuery) {
+    MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
+    _readLock.lock();
+    try {
+      RealTimeRegexpMatcher.regexMatch(searchQuery, _mutableFST,
+          dictId -> matchingDocIds.or(_invertedIndex.getDocIds(dictId)));
+      return matchingDocIds;
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _analyzer.close();
+  }
+
+  private List<String> analyze(String document) {
+    List<String> tokens = new ArrayList<>();
+    try (TokenStream tokenStream = _analyzer.tokenStream(_column, document)) {
+      tokenStream.reset();
+      CharTermAttribute attribute = 
tokenStream.getAttribute(CharTermAttribute.class);
+      while (tokenStream.incrementToken()) {
+        tokens.add(attribute.toString());
+      }
+      tokenStream.end();
+    } catch (IOException e) {
+      throw new RuntimeException("Caught exception while tokenizing the 
document for column: " + _column, e);
+    }
+    return tokens;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index 60e7b36c2c..c2a3d9c9e8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -156,7 +156,7 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
     }
   }
 
-  SearcherManager getSearcherManager() {
+  public SearcherManager getSearcherManager() {
     return _searcherManager;
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
new file mode 100644
index 0000000000..b79482a1f0
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.lucene.search.SearcherManager;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class NativeAndLuceneMutableTextIndexTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"RealTimeNativeVsLuceneTest");
+  private static final String TEXT_COLUMN_NAME = "testColumnName";
+
+  private RealtimeLuceneTextIndex _realtimeLuceneTextIndex;
+  private NativeMutableTextIndex _nativeMutableTextIndex;
+
+  private List<String> getTextData() {
+    return Arrays.asList("Prince Andrew kept looking with an amused smile from 
Pierre",
+        "vicomte and from the vicomte to their hostess. In the first moment 
of",
+        "Pierre’s outburst Anna Pávlovna, despite her social experience, was",
+        "horror-struck. But when she saw that Pierre’s sacrilegious words",
+        "had not exasperated the vicomte, and had convinced herself that it 
was",
+        "impossible to stop him, she rallied her forces and joined the vicomte 
in", "a vigorous attack on the orator",
+        "horror-struck. But when she", "she rallied her forces and joined", 
"outburst Anna Pávlovna",
+        "she rallied her forces and", "despite her social experience", "had 
not exasperated the vicomte",
+        " despite her social experience", "impossible to stop him", "despite 
her social experience");
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _realtimeLuceneTextIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, 
INDEX_DIR, "fooBar");
+    _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME);
+    List<String> documents = getTextData();
+
+    for (String doc : documents) {
+      _realtimeLuceneTextIndex.add(doc);
+      _nativeMutableTextIndex.add(doc);
+    }
+
+    SearcherManager searcherManager = 
_realtimeLuceneTextIndex.getSearcherManager();
+    try {
+      searcherManager.maybeRefresh();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _realtimeLuceneTextIndex.close();
+  }
+
+  @Test
+  public void testQueries() {
+    String nativeQuery = "vico.*";
+    String luceneQuery = "vico*";
+    testSelectionResults(nativeQuery, luceneQuery);
+
+    nativeQuery = "vicomte";
+    luceneQuery = "vicomte";
+    testSelectionResults(nativeQuery, luceneQuery);
+
+    nativeQuery = "s.*";
+    luceneQuery = "s*";
+    testSelectionResults(nativeQuery, luceneQuery);
+
+    nativeQuery = "impossible";
+    luceneQuery = "impossible";
+    testSelectionResults(nativeQuery, luceneQuery);
+
+    nativeQuery = "forc.*s";
+    luceneQuery = "forc*s";
+    testSelectionResults(nativeQuery, luceneQuery);
+  }
+
+  private void testSelectionResults(String nativeQuery, String luceneQuery) {
+    MutableRoaringBitmap resultset = 
_realtimeLuceneTextIndex.getDocIds(luceneQuery);
+    MutableRoaringBitmap resultset2 = 
_nativeMutableTextIndex.getDocIds(nativeQuery);
+
+    assertEquals(_nativeMutableTextIndex.getDocIds(nativeQuery), 
_realtimeLuceneTextIndex.getDocIds(luceneQuery));
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java
new file mode 100644
index 0000000000..8082504dd3
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class NativeMutableTextIndexConcurrentTest {
+  private ExecutorService _threadPool;
+  private Set<String> _resultSet;
+
+  @BeforeClass
+  private void setup() {
+    _threadPool = Executors.newFixedThreadPool(10);
+    _resultSet = new ConcurrentSkipListSet<>();
+  }
+
+  @AfterClass
+  private void shutDown() {
+    _threadPool.shutdown();
+  }
+
+  @Test
+  public void testConcurrentWriteAndRead()
+      throws InterruptedException, IOException {
+    CountDownLatch countDownLatch = new CountDownLatch(2);
+    List<String> words = new ArrayList<>();
+    words.add("ab");
+    words.add("abba");
+    words.add("aba");
+    words.add("bab");
+    words.add("cdd");
+    words.add("efg");
+
+    try (NativeMutableTextIndex textIndex = new 
NativeMutableTextIndex("testFSTColumn")) {
+      _threadPool.submit(() -> {
+        try {
+          performReads(textIndex, words, 20, 200, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      _threadPool.submit(() -> {
+        try {
+          performWrites(textIndex, words, 10, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      countDownLatch.await();
+    }
+
+    assertEquals(_resultSet.size(), words.size());
+
+    assertTrue(_resultSet.contains("ab"), "ab not found in result set");
+    assertTrue(_resultSet.contains("abba"), "abba not found in result set");
+    assertTrue(_resultSet.contains("aba"), "aba not found in result set");
+    assertTrue(_resultSet.contains("bab"), "bab not found in result set");
+    assertTrue(_resultSet.contains("cdd"), "cdd not found in result set");
+    assertTrue(_resultSet.contains("efg"), "efg not found in result set");
+  }
+
+  @Test
+  public void testConcurrentWriteWithMultipleThreads()
+      throws InterruptedException, IOException {
+    List<String> firstThreadWords = new ArrayList<>();
+    List<String> secondThreadWords = new ArrayList<>();
+    List<String> mergedThreadWords = new ArrayList<>();
+    CountDownLatch countDownLatch = new CountDownLatch(3);
+    firstThreadWords.add("ab");
+    firstThreadWords.add("abba");
+    firstThreadWords.add("aba");
+    secondThreadWords.add("bab");
+    secondThreadWords.add("cdd");
+    secondThreadWords.add("efg");
+
+    mergedThreadWords.addAll(firstThreadWords);
+    mergedThreadWords.addAll(secondThreadWords);
+
+    try (NativeMutableTextIndex textIndex = new 
NativeMutableTextIndex("testFSTColumn")) {
+      _threadPool.submit(() -> {
+        try {
+          performReads(textIndex, mergedThreadWords, 20, 200, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      _threadPool.submit(() -> {
+        try {
+          performWrites(textIndex, firstThreadWords, 10, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      _threadPool.submit(() -> {
+        try {
+          performWrites(textIndex, secondThreadWords, 10, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      countDownLatch.await();
+    }
+
+    assertEquals(_resultSet.size(), mergedThreadWords.size());
+
+    assertTrue(_resultSet.contains("ab"), "ab not found in result set");
+    assertTrue(_resultSet.contains("abba"), "abba not found in result set");
+    assertTrue(_resultSet.contains("aba"), "aba not found in result set");
+    assertTrue(_resultSet.contains("bab"), "bab not found in result set");
+    assertTrue(_resultSet.contains("cdd"), "cdd not found in result set");
+    assertTrue(_resultSet.contains("efg"), "efg not found in result set");
+  }
+
+  @Test
+  public void testMayhem()
+      throws InterruptedException, IOException {
+    List<String> firstThreadWords = new ArrayList<>();
+    List<String> secondThreadWords = new ArrayList<>();
+    List<String> mergedThreadWords = new ArrayList<>();
+    CountDownLatch countDownLatch = new CountDownLatch(4);
+    firstThreadWords.add("ab");
+    firstThreadWords.add("abba");
+    firstThreadWords.add("aba");
+    secondThreadWords.add("bab");
+    secondThreadWords.add("cdd");
+    secondThreadWords.add("efg");
+
+    mergedThreadWords.addAll(firstThreadWords);
+    mergedThreadWords.addAll(secondThreadWords);
+
+    try (NativeMutableTextIndex textIndex = new 
NativeMutableTextIndex("testFSTColumn")) {
+      _threadPool.submit(() -> {
+        try {
+          performReads(textIndex, firstThreadWords, 20, 200, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      _threadPool.submit(() -> {
+        try {
+          performWrites(textIndex, secondThreadWords, 10, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      _threadPool.submit(() -> {
+        try {
+          performReads(textIndex, secondThreadWords, 20, 200, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      _threadPool.submit(() -> {
+        try {
+          performWrites(textIndex, firstThreadWords, 10, countDownLatch);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+
+      countDownLatch.await();
+    }
+
+    assertEquals(_resultSet.size(), mergedThreadWords.size());
+
+    assertTrue(_resultSet.contains("ab"), "ab not found in result set");
+    assertTrue(_resultSet.contains("abba"), "abba not found in result set");
+    assertTrue(_resultSet.contains("aba"), "aba not found in result set");
+    assertTrue(_resultSet.contains("bab"), "bab not found in result set");
+    assertTrue(_resultSet.contains("cdd"), "cdd not found in result set");
+    assertTrue(_resultSet.contains("efg"), "efg not found in result set");
+  }
+
+  private void performReads(NativeMutableTextIndex textIndex, List<String> 
words, int count, long sleepTime,
+      CountDownLatch countDownLatch)
+      throws InterruptedException {
+
+    for (int i = 0; i < count; i++) {
+      if (_resultSet.size() == words.size()) {
+        break;
+      }
+
+      for (int j = 0; j < words.size(); j++) {
+        String currentWord = words.get(j);
+
+        if (_resultSet.contains(currentWord)) {
+          continue;
+        }
+
+        if (textIndex.getDocIds(words.get(j)).getCardinality() == 1) {
+          _resultSet.add(currentWord);
+        }
+      }
+
+      Thread.sleep(sleepTime);
+    }
+
+    countDownLatch.countDown();
+  }
+
+  private void performWrites(NativeMutableTextIndex textIndex, List<String> 
words, long sleepTime,
+      CountDownLatch countDownLatch)
+      throws InterruptedException {
+
+    for (int i = 0; i < words.size(); i++) {
+      textIndex.add(words.get(i));
+
+      Thread.sleep(sleepTime);
+    }
+
+    countDownLatch.countDown();
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexReaderWriterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexReaderWriterTest.java
new file mode 100644
index 0000000000..bd03216c17
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexReaderWriterTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.IOException;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class NativeMutableTextIndexReaderWriterTest {
+
+  @Test
+  public void testIndexWriterReader()
+      throws IOException {
+    String[] uniqueValues = new String[4];
+    uniqueValues[0] = "hello-world";
+    uniqueValues[1] = "hello-world123";
+    uniqueValues[2] = "still";
+    uniqueValues[3] = "zoobar";
+
+    try (NativeMutableTextIndex textIndex = new 
NativeMutableTextIndex("testFSTColumn")) {
+      for (int i = 0; i < 4; i++) {
+        textIndex.add(uniqueValues[i]);
+      }
+
+      int[] matchedDocIds = textIndex.getDocIds("hello.*").toArray();
+      assertEquals(2, matchedDocIds.length);
+      assertEquals(0, matchedDocIds[0]);
+      assertEquals(1, matchedDocIds[1]);
+
+      matchedDocIds = textIndex.getDocIds(".*llo").toArray();
+      assertEquals(2, matchedDocIds.length);
+      assertEquals(0, matchedDocIds[0]);
+      assertEquals(1, matchedDocIds[1]);
+
+      matchedDocIds = textIndex.getDocIds("wor.*").toArray();
+      assertEquals(2, matchedDocIds.length);
+      assertEquals(0, matchedDocIds[0]);
+      assertEquals(1, matchedDocIds[1]);
+
+      matchedDocIds = textIndex.getDocIds("zoo.*").toArray();
+      assertEquals(1, matchedDocIds.length);
+      assertEquals(3, matchedDocIds[0]);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java
index dee798f5dc..fa8f6e2b52 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java
@@ -37,14 +37,11 @@ import static org.testng.AssertJUnit.assertTrue;
 
 public class MutableFSTConcurrentTest {
   private ExecutorService _threadPool;
-
-  private CountDownLatch _countDownLatch;
   private Set<String> _resultSet;
 
   @BeforeClass
   private void setup() {
-     _threadPool = Executors.newFixedThreadPool(2);
-    _countDownLatch = new CountDownLatch(2);
+    _threadPool = Executors.newFixedThreadPool(2);
     _resultSet = new HashSet<>();
   }
 
@@ -58,6 +55,7 @@ public class MutableFSTConcurrentTest {
       throws InterruptedException {
     MutableFST mutableFST = new MutableFSTImpl();
     List<String> words = new ArrayList<>();
+    CountDownLatch countDownLatch = new CountDownLatch(2);
 
     words.add("ab");
     words.add("abba");
@@ -76,21 +74,21 @@ public class MutableFSTConcurrentTest {
 
     _threadPool.submit(() -> {
       try {
-        performReads(mutableFST, words, 10, 200);
+        performReads(mutableFST, words, 10, 200, countDownLatch);
       } catch (InterruptedException e) {
-        e.printStackTrace();
+        throw new RuntimeException(e);
       }
     });
 
     _threadPool.submit(() -> {
       try {
-        performWrites(mutableFST, wordsWithMetadata, 10);
+        performWrites(mutableFST, wordsWithMetadata, 10, countDownLatch);
       } catch (InterruptedException e) {
-        e.printStackTrace();
+        throw new RuntimeException(e);
       }
     });
 
-    _countDownLatch.await();
+    countDownLatch.await();
 
     assertEquals(_resultSet.size(), words.size());
 
@@ -102,8 +100,54 @@ public class MutableFSTConcurrentTest {
     assertTrue("efg not found in result set", _resultSet.contains("efg"));
   }
 
-  private void performReads(MutableFST fst, List<String> words, int count,
-      long sleepTime)
+  @Test
+  public void testConcurrentLongWriteAndRead()
+      throws InterruptedException {
+    MutableFST mutableFST = new MutableFSTImpl();
+    List<String> words = new ArrayList<>();
+    CountDownLatch countDownLatch = new CountDownLatch(2);
+
+    mutableFST.addPath("ab", 1);
+
+    List<Pair<String, Integer>> wordsWithMetadata = new ArrayList<>();
+
+    // Add some write pressure
+    wordsWithMetadata.add(Pair.of("egegdgrbsbrsegzgzegzegegjntnmtj", 2));
+    
wordsWithMetadata.add(Pair.of("hrwbwefweg4wreghrtbrassregfesfefefefzew4ere", 
2));
+    
wordsWithMetadata.add(Pair.of("easzegfegrertegbxzzez3erfezgzeddzdewstfefed", 
2));
+    
wordsWithMetadata.add(Pair.of("tjntrhndsrsgezgrsxzetgteszetgezfzezedrefzdzdzdzdz",
 2));
+    
wordsWithMetadata.add(Pair.of("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa",
 2));
+
+    
words.add("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa");
+
+    _threadPool.submit(() -> {
+      try {
+        performReads(mutableFST, words, 10, 10, countDownLatch);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    _threadPool.submit(() -> {
+      try {
+        performWrites(mutableFST, wordsWithMetadata, 0, countDownLatch);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    countDownLatch.await();
+
+    assertEquals(_resultSet.size(), words.size());
+
+    
assertTrue("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa 
not found in result set",
+        
_resultSet.contains("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa"));
+
+    _resultSet.clear();
+  }
+
+  private void performReads(MutableFST fst, List<String> words, int count, 
long sleepTime,
+      CountDownLatch countDownLatch)
       throws InterruptedException {
 
     for (int i = 0; i < count; i++) {
@@ -126,11 +170,11 @@ public class MutableFSTConcurrentTest {
       Thread.sleep(sleepTime);
     }
 
-    _countDownLatch.countDown();
+    countDownLatch.countDown();
   }
 
-  private void performWrites(MutableFST fst, List<Pair<String, Integer>> 
wordsAndMetadata,
-      long sleepTime)
+  private void performWrites(MutableFST fst, List<Pair<String, Integer>> 
wordsAndMetadata, long sleepTime,
+      CountDownLatch countDownLatch)
       throws InterruptedException {
 
     for (int i = 0; i < wordsAndMetadata.size(); i++) {
@@ -141,6 +185,6 @@ public class MutableFSTConcurrentTest {
       Thread.sleep(sleepTime);
     }
 
-    _countDownLatch.countDown();
+    countDownLatch.countDown();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to