This is an automated email from the ASF dual-hosted git repository. atri pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 497d0517ab Introduce Real Time Native Text Index (#8636) 497d0517ab is described below commit 497d0517abfe62f7a95f9eb0602a2c5fa07dc531 Author: Atri Sharma <atri.j...@gmail.com> AuthorDate: Tue May 10 22:16:11 2022 +0530 Introduce Real Time Native Text Index (#8636) This commit introduces the real time native text index. The text index is real time i.e. as data comes in, it is available for search. There is no refresh required. The index works with TEXT_CONTAINS. * Update pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeNativeTextIndex.java Co-authored-by: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> * Rebase * Checkstyle fixes * Add missing check * More review comments * Update tests * Move to reusable analyzer and refactor current realtime test * More comments and checkstyle * Add Missing File * Update per comments Co-authored-by: Atri Sharma <securonix@securonixs-MacBook-Pro.local> Co-authored-by: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> Co-authored-by: Atri Sharma <securonix@securonixs-MacBook-Pro-3.local> --- .../org/apache/pinot/core/plan/FilterPlanNode.java | 9 +- ...TextIndicesRealtimeClusterIntegrationTest.java} | 50 +++- .../indexsegment/mutable/MutableSegmentImpl.java | 25 +- .../impl/invertedindex/NativeMutableTextIndex.java | 123 ++++++++++ .../invertedindex/RealtimeLuceneTextIndex.java | 2 +- .../NativeAndLuceneMutableTextIndexTest.java | 107 +++++++++ .../NativeMutableTextIndexConcurrentTest.java | 251 +++++++++++++++++++++ .../NativeMutableTextIndexReaderWriterTest.java | 63 ++++++ .../mutablefst/MutableFSTConcurrentTest.java | 74 ++++-- 9 files changed, 664 insertions(+), 40 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java index 6260085d96..7bc1a4f642 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java @@ -49,6 +49,7 @@ import org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEv import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; import org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.datasource.DataSource; @@ -228,7 +229,7 @@ public class FilterPlanNode implements PlanNode { ExpressionContext lhs = predicate.getLhs(); if (lhs.getType() == ExpressionContext.Type.FUNCTION) { if (canApplyH3IndexForDistanceCheck(predicate, lhs.getFunction())) { - return new H3IndexFilterOperator(_indexSegment, predicate, numDocs); + return new H3IndexFilterOperator(_indexSegment, predicate, numDocs); } else if (canApplyH3IndexForInclusionCheck(predicate, lhs.getFunction())) { return new H3InclusionIndexFilterOperator(_indexSegment, predicate, numDocs); } else { @@ -246,14 +247,16 @@ public class FilterPlanNode implements PlanNode { switch (predicate.getType()) { case TEXT_CONTAINS: TextIndexReader textIndexReader = dataSource.getTextIndex(); - if (!(textIndexReader instanceof NativeTextIndexReader)) { + if (!(textIndexReader instanceof NativeTextIndexReader) + && !(textIndexReader instanceof NativeMutableTextIndex)) { throw new UnsupportedOperationException("TEXT_CONTAINS is supported only on native text index"); } return new TextContainsFilterOperator(textIndexReader, (TextContainsPredicate) predicate, numDocs); case TEXT_MATCH: textIndexReader = dataSource.getTextIndex(); // We could check for real time and segment Lucene reader, but easier to check the other way round - if (textIndexReader instanceof NativeTextIndexReader) { + if (textIndexReader instanceof NativeTextIndexReader + || textIndexReader instanceof NativeMutableTextIndex) { throw new UnsupportedOperationException("TEXT_MATCH is not supported on native text index"); } return new TextMatchFilterOperator(textIndexReader, (TextMatchPredicate) predicate, numDocs); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java similarity index 75% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java index fad182f957..7af80d0363 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java @@ -25,7 +25,9 @@ import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -46,19 +48,25 @@ import static org.testng.AssertJUnit.fail; /** - * Cluster integration test for near realtime text search + * Cluster integration test for near realtime text search (lucene) and realtime text search (native). */ -public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest { +public class TextIndicesRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest { private static final String TEXT_COLUMN_NAME = "skills"; + private static final String TEXT_COLUMN_NAME_NATIVE = "skills_native"; private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; private static final int NUM_SKILLS = 24; private static final int NUM_MATCHING_SKILLS = 4; private static final int NUM_RECORDS = NUM_SKILLS * 1000; private static final int NUM_MATCHING_RECORDS = NUM_MATCHING_SKILLS * 1000; + private static final int NUM_MATCHING_RECORDS_NATIVE = 7000; private static final String TEST_TEXT_COLUMN_QUERY = "SELECT COUNT(*) FROM mytable WHERE TEXT_MATCH(skills, '\"machine learning\" AND spark')"; + private static final String TEST_TEXT_COLUMN_QUERY_NATIVE = + "SELECT COUNT(*) FROM mytable WHERE TEXT_CONTAINS(skills_native, 'm.*') AND TEXT_CONTAINS(skills_native, " + + "'spark')"; + @Override public String getTimeColumnName() { return TIME_COLUMN_NAME; @@ -79,7 +87,7 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration @Nullable @Override protected List<String> getInvertedIndexColumns() { - return null; + return Collections.singletonList(TEXT_COLUMN_NAME_NATIVE); } @Override @@ -101,8 +109,13 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration @Override protected List<FieldConfig> getFieldConfigs() { - return Collections.singletonList( - new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null)); + Map<String, String> propertiesMap = new HashMap<>(); + propertiesMap.put(FieldConfig.TEXT_FST_TYPE, FieldConfig.TEXT_NATIVE_FST_LITERAL); + + return Arrays.asList( + new FieldConfig(TEXT_COLUMN_NAME, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null), + new FieldConfig(TEXT_COLUMN_NAME_NATIVE, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, + propertiesMap)); } @BeforeClass @@ -125,6 +138,7 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration // Create and upload the schema and table config Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME) .addSingleValueDimension(TEXT_COLUMN_NAME, FieldSpec.DataType.STRING) + .addSingleValueDimension(TEXT_COLUMN_NAME_NATIVE, FieldSpec.DataType.STRING) .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); addSchema(schema); addTableConfig(createRealtimeTableConfig(avroFile)); @@ -172,6 +186,8 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); avroSchema.setFields(Arrays.asList(new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null), + new org.apache.avro.Schema.Field(TEXT_COLUMN_NAME_NATIVE, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null), new org.apache.avro.Schema.Field(TIME_COLUMN_NAME, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, null))); try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { @@ -179,6 +195,7 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration for (int i = 0; i < NUM_RECORDS; i++) { GenericData.Record record = new GenericData.Record(avroSchema); record.put(TEXT_COLUMN_NAME, skills.get(i % NUM_SKILLS)); + record.put(TEXT_COLUMN_NAME_NATIVE, skills.get(i % NUM_SKILLS)); record.put(TIME_COLUMN_NAME, System.currentTimeMillis()); fileWriter.append(record); } @@ -192,7 +209,7 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration // Keep posting queries until all records are consumed long previousResult = 0; while (getCurrentCountStarResult() < NUM_RECORDS) { - long result = getTextColumnQueryResult(); + long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY); assertTrue(result >= previousResult); previousResult = result; Thread.sleep(100); @@ -201,7 +218,7 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration //Lucene index on consuming segments to update the latest records TestUtils.waitForCondition(aVoid -> { try { - return getTextColumnQueryResult() == NUM_MATCHING_RECORDS; + return getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY) == NUM_MATCHING_RECORDS; } catch (Exception e) { fail("Caught exception while getting text column query result"); return false; @@ -209,8 +226,23 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration }, 10_000L, "Failed to reach expected number of matching records"); } - private long getTextColumnQueryResult() + @Test + public void testTextSearchCountQueryNative() + throws Exception { + // Keep posting queries until all records are consumed + long previousResult = 0; + while (getCurrentCountStarResult() < NUM_RECORDS) { + long result = getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE); + assertTrue(result >= previousResult); + previousResult = result; + Thread.sleep(100); + } + + assertTrue(getTextColumnQueryResult(TEST_TEXT_COLUMN_QUERY_NATIVE) == NUM_MATCHING_RECORDS_NATIVE); + } + + private long getTextColumnQueryResult(String query) throws Exception { - return postQuery(TEST_TEXT_COLUMN_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(); + return postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index d88389eeff..ecab67ed5d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -41,6 +41,7 @@ import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary; import org.apache.pinot.segment.local.realtime.impl.geospatial.MutableH3Index; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex; import org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector; @@ -300,20 +301,20 @@ public class MutableSegmentImpl implements MutableSegment { } if (useNativeTextIndex) { - // TODO: Add native text index support - _logger.warn("Mutable native text index is not supported, falling back to the Lucene text index"); - } + textIndex = new NativeMutableTextIndex(column); + } else { - // TODO - this logic is in the wrong place and belongs in a Lucene-specific submodule, - // it is beyond the scope of realtime index pluggability to do this refactoring, so realtime - // text indexes remain statically defined. Revisit this after this refactoring has been done. - RealtimeLuceneTextIndex luceneTextIndex = - new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName); - if (_realtimeLuceneReaders == null) { - _realtimeLuceneReaders = new RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName); + // TODO - this logic is in the wrong place and belongs in a Lucene-specific submodule, + // it is beyond the scope of realtime index pluggability to do this refactoring, so realtime + // text indexes remain statically defined. Revisit this after this refactoring has been done. + RealtimeLuceneTextIndex luceneTextIndex = + new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName); + if (_realtimeLuceneReaders == null) { + _realtimeLuceneReaders = new RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName); + } + _realtimeLuceneReaders.addReader(luceneTextIndex); + textIndex = luceneTextIndex; } - _realtimeLuceneReaders.addReader(luceneTextIndex); - textIndex = luceneTextIndex; } else { textIndex = null; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java new file mode 100644 index 0000000000..1f27f57b5a --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndex.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST; +import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl; +import org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher; +import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class NativeMutableTextIndex implements MutableTextIndex { + private final String _column; + private final MutableFST _mutableFST; + private final RealtimeInvertedIndex _invertedIndex; + //TODO: Move to mutable dictionary + private final Object2IntOpenHashMap<String> _termToDictIdMapping; + private final ReentrantReadWriteLock.ReadLock _readLock; + private final ReentrantReadWriteLock.WriteLock _writeLock; + private final Analyzer _analyzer; + + private int _nextDocId = 0; + private int _nextDictId = 0; + + public NativeMutableTextIndex(String column) { + _column = column; + _mutableFST = new MutableFSTImpl(); + _termToDictIdMapping = new Object2IntOpenHashMap<>(); + _invertedIndex = new RealtimeInvertedIndex(); + + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + _readLock = readWriteLock.readLock(); + _writeLock = readWriteLock.writeLock(); + _analyzer = new StandardAnalyzer(LuceneTextIndexCreator.ENGLISH_STOP_WORDS_SET); + } + + @Override + public void add(String document) { + Iterable<String> tokens; + + tokens = analyze(document); + _writeLock.lock(); + try { + for (String token : tokens) { + Integer currentDictId = _termToDictIdMapping.computeIntIfAbsent(token, k -> { + int localDictId = _nextDictId++; + _mutableFST.addPath(token, localDictId); + return localDictId; + }); + _invertedIndex.add(currentDictId, _nextDocId); + } + _nextDocId++; + } finally { + _writeLock.unlock(); + } + } + + @Override + public ImmutableRoaringBitmap getDictIds(String searchQuery) { + throw new UnsupportedOperationException(); + } + + @Override + public MutableRoaringBitmap getDocIds(String searchQuery) { + MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap(); + _readLock.lock(); + try { + RealTimeRegexpMatcher.regexMatch(searchQuery, _mutableFST, + dictId -> matchingDocIds.or(_invertedIndex.getDocIds(dictId))); + return matchingDocIds; + } finally { + _readLock.unlock(); + } + } + + @Override + public void close() + throws IOException { + _analyzer.close(); + } + + private List<String> analyze(String document) { + List<String> tokens = new ArrayList<>(); + try (TokenStream tokenStream = _analyzer.tokenStream(_column, document)) { + tokenStream.reset(); + CharTermAttribute attribute = tokenStream.getAttribute(CharTermAttribute.class); + while (tokenStream.incrementToken()) { + tokens.add(attribute.toString()); + } + tokenStream.end(); + } catch (IOException e) { + throw new RuntimeException("Caught exception while tokenizing the document for column: " + _column, e); + } + return tokens; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java index 60e7b36c2c..c2a3d9c9e8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java @@ -156,7 +156,7 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { } } - SearcherManager getSearcherManager() { + public SearcherManager getSearcherManager() { return _searcherManager; } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java new file mode 100644 index 0000000000..b79482a1f0 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.lucene.search.SearcherManager; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class NativeAndLuceneMutableTextIndexTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RealTimeNativeVsLuceneTest"); + private static final String TEXT_COLUMN_NAME = "testColumnName"; + + private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + private NativeMutableTextIndex _nativeMutableTextIndex; + + private List<String> getTextData() { + return Arrays.asList("Prince Andrew kept looking with an amused smile from Pierre", + "vicomte and from the vicomte to their hostess. In the first moment of", + "Pierre’s outburst Anna Pávlovna, despite her social experience, was", + "horror-struck. But when she saw that Pierre’s sacrilegious words", + "had not exasperated the vicomte, and had convinced herself that it was", + "impossible to stop him, she rallied her forces and joined the vicomte in", "a vigorous attack on the orator", + "horror-struck. But when she", "she rallied her forces and joined", "outburst Anna Pávlovna", + "she rallied her forces and", "despite her social experience", "had not exasperated the vicomte", + " despite her social experience", "impossible to stop him", "despite her social experience"); + } + + @BeforeClass + public void setUp() + throws Exception { + _realtimeLuceneTextIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, "fooBar"); + _nativeMutableTextIndex = new NativeMutableTextIndex(TEXT_COLUMN_NAME); + List<String> documents = getTextData(); + + for (String doc : documents) { + _realtimeLuceneTextIndex.add(doc); + _nativeMutableTextIndex.add(doc); + } + + SearcherManager searcherManager = _realtimeLuceneTextIndex.getSearcherManager(); + try { + searcherManager.maybeRefresh(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public void tearDown() { + _realtimeLuceneTextIndex.close(); + } + + @Test + public void testQueries() { + String nativeQuery = "vico.*"; + String luceneQuery = "vico*"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "vicomte"; + luceneQuery = "vicomte"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "s.*"; + luceneQuery = "s*"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "impossible"; + luceneQuery = "impossible"; + testSelectionResults(nativeQuery, luceneQuery); + + nativeQuery = "forc.*s"; + luceneQuery = "forc*s"; + testSelectionResults(nativeQuery, luceneQuery); + } + + private void testSelectionResults(String nativeQuery, String luceneQuery) { + MutableRoaringBitmap resultset = _realtimeLuceneTextIndex.getDocIds(luceneQuery); + MutableRoaringBitmap resultset2 = _nativeMutableTextIndex.getDocIds(nativeQuery); + + assertEquals(_nativeMutableTextIndex.getDocIds(nativeQuery), _realtimeLuceneTextIndex.getDocIds(luceneQuery)); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java new file mode 100644 index 0000000000..8082504dd3 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexConcurrentTest.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class NativeMutableTextIndexConcurrentTest { + private ExecutorService _threadPool; + private Set<String> _resultSet; + + @BeforeClass + private void setup() { + _threadPool = Executors.newFixedThreadPool(10); + _resultSet = new ConcurrentSkipListSet<>(); + } + + @AfterClass + private void shutDown() { + _threadPool.shutdown(); + } + + @Test + public void testConcurrentWriteAndRead() + throws InterruptedException, IOException { + CountDownLatch countDownLatch = new CountDownLatch(2); + List<String> words = new ArrayList<>(); + words.add("ab"); + words.add("abba"); + words.add("aba"); + words.add("bab"); + words.add("cdd"); + words.add("efg"); + + try (NativeMutableTextIndex textIndex = new NativeMutableTextIndex("testFSTColumn")) { + _threadPool.submit(() -> { + try { + performReads(textIndex, words, 20, 200, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, words, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + countDownLatch.await(); + } + + assertEquals(_resultSet.size(), words.size()); + + assertTrue(_resultSet.contains("ab"), "ab not found in result set"); + assertTrue(_resultSet.contains("abba"), "abba not found in result set"); + assertTrue(_resultSet.contains("aba"), "aba not found in result set"); + assertTrue(_resultSet.contains("bab"), "bab not found in result set"); + assertTrue(_resultSet.contains("cdd"), "cdd not found in result set"); + assertTrue(_resultSet.contains("efg"), "efg not found in result set"); + } + + @Test + public void testConcurrentWriteWithMultipleThreads() + throws InterruptedException, IOException { + List<String> firstThreadWords = new ArrayList<>(); + List<String> secondThreadWords = new ArrayList<>(); + List<String> mergedThreadWords = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(3); + firstThreadWords.add("ab"); + firstThreadWords.add("abba"); + firstThreadWords.add("aba"); + secondThreadWords.add("bab"); + secondThreadWords.add("cdd"); + secondThreadWords.add("efg"); + + mergedThreadWords.addAll(firstThreadWords); + mergedThreadWords.addAll(secondThreadWords); + + try (NativeMutableTextIndex textIndex = new NativeMutableTextIndex("testFSTColumn")) { + _threadPool.submit(() -> { + try { + performReads(textIndex, mergedThreadWords, 20, 200, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, firstThreadWords, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, secondThreadWords, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + countDownLatch.await(); + } + + assertEquals(_resultSet.size(), mergedThreadWords.size()); + + assertTrue(_resultSet.contains("ab"), "ab not found in result set"); + assertTrue(_resultSet.contains("abba"), "abba not found in result set"); + assertTrue(_resultSet.contains("aba"), "aba not found in result set"); + assertTrue(_resultSet.contains("bab"), "bab not found in result set"); + assertTrue(_resultSet.contains("cdd"), "cdd not found in result set"); + assertTrue(_resultSet.contains("efg"), "efg not found in result set"); + } + + @Test + public void testMayhem() + throws InterruptedException, IOException { + List<String> firstThreadWords = new ArrayList<>(); + List<String> secondThreadWords = new ArrayList<>(); + List<String> mergedThreadWords = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(4); + firstThreadWords.add("ab"); + firstThreadWords.add("abba"); + firstThreadWords.add("aba"); + secondThreadWords.add("bab"); + secondThreadWords.add("cdd"); + secondThreadWords.add("efg"); + + mergedThreadWords.addAll(firstThreadWords); + mergedThreadWords.addAll(secondThreadWords); + + try (NativeMutableTextIndex textIndex = new NativeMutableTextIndex("testFSTColumn")) { + _threadPool.submit(() -> { + try { + performReads(textIndex, firstThreadWords, 20, 200, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, secondThreadWords, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performReads(textIndex, secondThreadWords, 20, 200, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(textIndex, firstThreadWords, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + countDownLatch.await(); + } + + assertEquals(_resultSet.size(), mergedThreadWords.size()); + + assertTrue(_resultSet.contains("ab"), "ab not found in result set"); + assertTrue(_resultSet.contains("abba"), "abba not found in result set"); + assertTrue(_resultSet.contains("aba"), "aba not found in result set"); + assertTrue(_resultSet.contains("bab"), "bab not found in result set"); + assertTrue(_resultSet.contains("cdd"), "cdd not found in result set"); + assertTrue(_resultSet.contains("efg"), "efg not found in result set"); + } + + private void performReads(NativeMutableTextIndex textIndex, List<String> words, int count, long sleepTime, + CountDownLatch countDownLatch) + throws InterruptedException { + + for (int i = 0; i < count; i++) { + if (_resultSet.size() == words.size()) { + break; + } + + for (int j = 0; j < words.size(); j++) { + String currentWord = words.get(j); + + if (_resultSet.contains(currentWord)) { + continue; + } + + if (textIndex.getDocIds(words.get(j)).getCardinality() == 1) { + _resultSet.add(currentWord); + } + } + + Thread.sleep(sleepTime); + } + + countDownLatch.countDown(); + } + + private void performWrites(NativeMutableTextIndex textIndex, List<String> words, long sleepTime, + CountDownLatch countDownLatch) + throws InterruptedException { + + for (int i = 0; i < words.size(); i++) { + textIndex.add(words.get(i)); + + Thread.sleep(sleepTime); + } + + countDownLatch.countDown(); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexReaderWriterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexReaderWriterTest.java new file mode 100644 index 0000000000..bd03216c17 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableTextIndexReaderWriterTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.IOException; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class NativeMutableTextIndexReaderWriterTest { + + @Test + public void testIndexWriterReader() + throws IOException { + String[] uniqueValues = new String[4]; + uniqueValues[0] = "hello-world"; + uniqueValues[1] = "hello-world123"; + uniqueValues[2] = "still"; + uniqueValues[3] = "zoobar"; + + try (NativeMutableTextIndex textIndex = new NativeMutableTextIndex("testFSTColumn")) { + for (int i = 0; i < 4; i++) { + textIndex.add(uniqueValues[i]); + } + + int[] matchedDocIds = textIndex.getDocIds("hello.*").toArray(); + assertEquals(2, matchedDocIds.length); + assertEquals(0, matchedDocIds[0]); + assertEquals(1, matchedDocIds[1]); + + matchedDocIds = textIndex.getDocIds(".*llo").toArray(); + assertEquals(2, matchedDocIds.length); + assertEquals(0, matchedDocIds[0]); + assertEquals(1, matchedDocIds[1]); + + matchedDocIds = textIndex.getDocIds("wor.*").toArray(); + assertEquals(2, matchedDocIds.length); + assertEquals(0, matchedDocIds[0]); + assertEquals(1, matchedDocIds[1]); + + matchedDocIds = textIndex.getDocIds("zoo.*").toArray(); + assertEquals(1, matchedDocIds.length); + assertEquals(3, matchedDocIds[0]); + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java index dee798f5dc..fa8f6e2b52 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/nativefst/mutablefst/MutableFSTConcurrentTest.java @@ -37,14 +37,11 @@ import static org.testng.AssertJUnit.assertTrue; public class MutableFSTConcurrentTest { private ExecutorService _threadPool; - - private CountDownLatch _countDownLatch; private Set<String> _resultSet; @BeforeClass private void setup() { - _threadPool = Executors.newFixedThreadPool(2); - _countDownLatch = new CountDownLatch(2); + _threadPool = Executors.newFixedThreadPool(2); _resultSet = new HashSet<>(); } @@ -58,6 +55,7 @@ public class MutableFSTConcurrentTest { throws InterruptedException { MutableFST mutableFST = new MutableFSTImpl(); List<String> words = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(2); words.add("ab"); words.add("abba"); @@ -76,21 +74,21 @@ public class MutableFSTConcurrentTest { _threadPool.submit(() -> { try { - performReads(mutableFST, words, 10, 200); + performReads(mutableFST, words, 10, 200, countDownLatch); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } }); _threadPool.submit(() -> { try { - performWrites(mutableFST, wordsWithMetadata, 10); + performWrites(mutableFST, wordsWithMetadata, 10, countDownLatch); } catch (InterruptedException e) { - e.printStackTrace(); + throw new RuntimeException(e); } }); - _countDownLatch.await(); + countDownLatch.await(); assertEquals(_resultSet.size(), words.size()); @@ -102,8 +100,54 @@ public class MutableFSTConcurrentTest { assertTrue("efg not found in result set", _resultSet.contains("efg")); } - private void performReads(MutableFST fst, List<String> words, int count, - long sleepTime) + @Test + public void testConcurrentLongWriteAndRead() + throws InterruptedException { + MutableFST mutableFST = new MutableFSTImpl(); + List<String> words = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(2); + + mutableFST.addPath("ab", 1); + + List<Pair<String, Integer>> wordsWithMetadata = new ArrayList<>(); + + // Add some write pressure + wordsWithMetadata.add(Pair.of("egegdgrbsbrsegzgzegzegegjntnmtj", 2)); + wordsWithMetadata.add(Pair.of("hrwbwefweg4wreghrtbrassregfesfefefefzew4ere", 2)); + wordsWithMetadata.add(Pair.of("easzegfegrertegbxzzez3erfezgzeddzdewstfefed", 2)); + wordsWithMetadata.add(Pair.of("tjntrhndsrsgezgrsxzetgteszetgezfzezedrefzdzdzdzdz", 2)); + wordsWithMetadata.add(Pair.of("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa", 2)); + + words.add("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa"); + + _threadPool.submit(() -> { + try { + performReads(mutableFST, words, 10, 10, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + _threadPool.submit(() -> { + try { + performWrites(mutableFST, wordsWithMetadata, 0, countDownLatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + countDownLatch.await(); + + assertEquals(_resultSet.size(), words.size()); + + assertTrue("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa not found in result set", + _resultSet.contains("abacxcvbnmlkjjhgfsaqwertyuioopzxcvbnmllkjshfgsfawieeiuefgeurfeoafa")); + + _resultSet.clear(); + } + + private void performReads(MutableFST fst, List<String> words, int count, long sleepTime, + CountDownLatch countDownLatch) throws InterruptedException { for (int i = 0; i < count; i++) { @@ -126,11 +170,11 @@ public class MutableFSTConcurrentTest { Thread.sleep(sleepTime); } - _countDownLatch.countDown(); + countDownLatch.countDown(); } - private void performWrites(MutableFST fst, List<Pair<String, Integer>> wordsAndMetadata, - long sleepTime) + private void performWrites(MutableFST fst, List<Pair<String, Integer>> wordsAndMetadata, long sleepTime, + CountDownLatch countDownLatch) throws InterruptedException { for (int i = 0; i < wordsAndMetadata.size(); i++) { @@ -141,6 +185,6 @@ public class MutableFSTConcurrentTest { Thread.sleep(sleepTime); } - _countDownLatch.countDown(); + countDownLatch.countDown(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org