This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5becf5b Allow MV Field Support For Raw Columns in Text Indices (#7638) 5becf5b is described below commit 5becf5b81c996a6be39a22fd99aacfa7f1b3e1ac Author: Atri Sharma <atri.j...@gmail.com> AuthorDate: Sat Oct 30 06:31:49 2021 +0530 Allow MV Field Support For Raw Columns in Text Indices (#7638) Using the new MV raw byte forward index, allow multi value fields to be supported in text indices. --- .../pinot/queries/TextSearchQueriesTest.java | 204 ++++++--------------- .../creator/impl/SegmentColumnarIndexCreator.java | 154 ++++++++-------- .../impl/inv/text/LuceneFSTIndexCreator.java | 5 + .../creator/impl/text/LuceneTextIndexCreator.java | 20 ++ .../local/segment/index/loader/LoaderUtils.java | 13 +- .../loader/invertedindex/TextIndexHandler.java | 85 ++++++--- .../utils/nativefst/NativeFSTIndexCreator.java | 5 + .../index/loader/SegmentPreProcessorTest.java | 41 +++-- .../resources/data/newColumnsSchemaWithText.json | 10 + .../spi/index/creator/TextIndexCreator.java | 5 + 10 files changed, 280 insertions(+), 262 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java index ef2f054..98f3bc5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java @@ -31,9 +31,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -96,14 +98,16 @@ public class TextSearchQueriesTest extends BaseQueriesTest { private static final String SKILLS_TEXT_COL_DICT_NAME = "SKILLS_TEXT_COL_DICT"; private static final String SKILLS_TEXT_COL_MULTI_TERM_NAME = "SKILLS_TEXT_COL_1"; private static final String SKILLS_TEXT_NO_RAW_NAME = "SKILLS_TEXT_COL_2"; + private static final String SKILLS_TEXT_MV_COL_NAME = "SKILLS_TEXT_MV_COL"; + private static final String SKILLS_TEXT_MV_COL_DICT_NAME = "SKILLS_TEXT_MV_COL_DICT"; private static final String INT_COL_NAME = "INT_COL"; - private static final List<String> RAW_TEXT_INDEX_COLUMNS = Arrays - .asList(QUERY_LOG_TEXT_COL_NAME, SKILLS_TEXT_COL_NAME, SKILLS_TEXT_COL_MULTI_TERM_NAME, SKILLS_TEXT_NO_RAW_NAME); - private static final List<String> DICT_TEXT_INDEX_COLUMNS = Arrays.asList(SKILLS_TEXT_COL_DICT_NAME); + private static final List<String> RAW_TEXT_INDEX_COLUMNS = + Arrays.asList(QUERY_LOG_TEXT_COL_NAME, SKILLS_TEXT_COL_NAME, SKILLS_TEXT_COL_MULTI_TERM_NAME, + SKILLS_TEXT_NO_RAW_NAME, SKILLS_TEXT_MV_COL_NAME); + private static final List<String> DICT_TEXT_INDEX_COLUMNS = + Arrays.asList(SKILLS_TEXT_COL_DICT_NAME, SKILLS_TEXT_MV_COL_DICT_NAME); private static final int INT_BASE_VALUE = 1000; - private final List<GenericRow> _rows = new ArrayList<>(); - private IndexSegment _indexSegment; private List<IndexSegment> _indexSegments; @@ -157,8 +161,8 @@ public class TextSearchQueriesTest extends BaseQueriesTest { List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_TEXT_INDEX_COLUMNS.size() + DICT_TEXT_INDEX_COLUMNS.size()); for (String textIndexColumn : RAW_TEXT_INDEX_COLUMNS) { - fieldConfigs - .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null)); + fieldConfigs.add( + new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, FieldConfig.IndexType.TEXT, null, null)); } for (String textIndexColumn : DICT_TEXT_INDEX_COLUMNS) { fieldConfigs.add( @@ -174,6 +178,8 @@ public class TextSearchQueriesTest extends BaseQueriesTest { .addSingleValueDimension(SKILLS_TEXT_COL_DICT_NAME, FieldSpec.DataType.STRING) .addSingleValueDimension(SKILLS_TEXT_COL_MULTI_TERM_NAME, FieldSpec.DataType.STRING) .addSingleValueDimension(SKILLS_TEXT_NO_RAW_NAME, FieldSpec.DataType.STRING) + .addMultiValueDimension(SKILLS_TEXT_MV_COL_NAME, FieldSpec.DataType.STRING) + .addMultiValueDimension(SKILLS_TEXT_MV_COL_DICT_NAME, FieldSpec.DataType.STRING) .addMetric(INT_COL_NAME, FieldSpec.DataType.INT).build(); SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); config.setOutDir(INDEX_DIR.getPath()); @@ -197,24 +203,22 @@ public class TextSearchQueriesTest extends BaseQueriesTest { List<GenericRow> rows = new ArrayList<>(); // read the skills file - URL resourceUrl = getClass().getClassLoader().getResource("data/text_search_data/skills.txt"); - File skillFile = new File(resourceUrl.getFile()); String[] skills = new String[100]; + List<String[]> multiValueStringList = new ArrayList<>(); int skillCount = 0; - try (InputStream inputStream = new FileInputStream(skillFile); - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader( + Objects.requireNonNull(getClass().getClassLoader().getResourceAsStream("data/text_search_data/skills.txt"))))) { String line; while ((line = reader.readLine()) != null) { skills[skillCount++] = line; + multiValueStringList.add(StringUtils.splitByWholeSeparator(line, ", ")); } } // read the pql query log file (24k queries) and build dataset - resourceUrl = getClass().getClassLoader().getResource("data/text_search_data/pql_query1.txt"); - File logFile = new File(resourceUrl.getFile()); int counter = 0; - try (InputStream inputStream = new FileInputStream(logFile); - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(Objects.requireNonNull( + getClass().getClassLoader().getResourceAsStream("data/text_search_data/pql_query1.txt"))))) { String line; while ((line = reader.readLine()) != null) { GenericRow row = new GenericRow(); @@ -224,12 +228,16 @@ public class TextSearchQueriesTest extends BaseQueriesTest { row.putValue(SKILLS_TEXT_COL_NAME, "software engineering"); row.putValue(SKILLS_TEXT_COL_DICT_NAME, "software engineering"); row.putValue(SKILLS_TEXT_COL_MULTI_TERM_NAME, "software engineering"); - row.putValue(SKILLS_TEXT_COL_MULTI_TERM_NAME, "software engineering"); + row.putValue(SKILLS_TEXT_NO_RAW_NAME, "software engineering"); + row.putValue(SKILLS_TEXT_MV_COL_NAME, new String[]{"software", "engineering"}); + row.putValue(SKILLS_TEXT_MV_COL_DICT_NAME, new String[]{"software", "engineering"}); } else { row.putValue(SKILLS_TEXT_COL_NAME, skills[counter]); row.putValue(SKILLS_TEXT_COL_DICT_NAME, skills[counter]); row.putValue(SKILLS_TEXT_COL_MULTI_TERM_NAME, skills[counter]); row.putValue(SKILLS_TEXT_NO_RAW_NAME, skills[counter]); + row.putValue(SKILLS_TEXT_MV_COL_NAME, multiValueStringList.get(counter)); + row.putValue(SKILLS_TEXT_MV_COL_DICT_NAME, multiValueStringList.get(counter)); } rows.add(row); counter++; @@ -316,13 +324,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "management, docker image building and distribution" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Distributed systems\"') " - + "LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Distributed systems\"", expected); // TEST 5: phrase query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase @@ -339,13 +341,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "management, docker image building and distribution" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"query processing\"') LIMIT" - + " 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"query processing\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"query processing\"", expected); // TEST 6: phrase query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain the phrase "machine @@ -395,13 +391,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "management, docker image building and distribution" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\"') LIMIT" - + " 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Machine learning\"", expected); // TEST 7: composite phrase query using boolean operator AND // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain two independent phrases @@ -420,15 +410,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "performance scalable systems" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND " - + "\"Tensor flow\"') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND \"Tensor flow\"') " - + "LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Machine learning\" AND \"Tensor flow\"", expected); // TEST 8: term query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain term 'Java'. @@ -481,11 +463,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "distributed storage, concurrency, multi-threading, apache airflow" }); - query = "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("Java", expected); // TEST 9: composite term query using BOOLEAN operator AND // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain two independent @@ -529,12 +507,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "distributed storage, concurrency, multi-threading, apache airflow" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java AND C++') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'Java AND C++') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("Java AND C++", expected); // TEST 10: phrase query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "Java C++" as is. @@ -563,12 +536,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "multi-threading, apache airflow" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Java C++\"') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Java C++\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Java C++\"", expected); // TEST 11: composite phrase query using boolean operator AND. // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain two independent phrases @@ -581,15 +549,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "performance scalable systems" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND " - + "\"gpu processing\"') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND \"gpu " - + "processing\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Machine learning\" AND \"gpu processing\"", expected); // TEST 12: composite phrase and term query using boolean operator AND. // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "machine learning" @@ -608,14 +568,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "performance scalable systems" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND " - + "gpu') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND gpu') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Machine learning\" AND gpu", expected); // TEST 13: composite phrase and term query using boolean operator AND // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "machine learning" @@ -635,15 +588,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "performance scalable systems" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND gpu" - + " AND python') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"Machine learning\" AND gpu AND python') " - + "LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"Machine learning\" AND gpu AND python", expected); // TEST 14: term query // Search in SKILLS_TEXT_COL column to look for documents that MUST contain term 'apache'. The expected result @@ -678,11 +623,8 @@ public class TextSearchQueriesTest extends BaseQueriesTest { "Databases, columnar query processing, Apache Arrow, distributed systems, Machine learning, cluster " + "management, docker image building and distribution" }); - query = "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'apache') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'apache') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("apache", expected); // TEST 15: composite phrase and term query using boolean operator AND. // search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "distributed @@ -701,15 +643,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "management, docker image building and distribution" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND " - + "apache') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND apache') LIMIT " - + "50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"distributed systems\" AND apache", expected); // TEST 16: term query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain term 'database'. @@ -744,11 +678,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + " building large scale systems" }); - query = "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'database') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'database') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("database", expected); // TEST 17: phrase query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase "database engine" @@ -763,13 +693,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + " building large scale systems" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"database engine\"') LIMIT " - + "50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"database engine\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"database engine\"", expected); // TEST 18: phrase query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase @@ -790,13 +714,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + "multi-threading, C++," }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"publish subscribe\"') " - + "LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"publish subscribe\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"publish subscribe\"", expected); // TEST 19: phrase query // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain phrase @@ -805,14 +723,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { expected = new ArrayList<>(); expected.add(new Serializable[]{1000, "Accounts, Banking, Insurance, worked in NGO, Java"}); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"accounts banking " - + "insurance\"') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"accounts banking insurance\"') LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"accounts banking insurance\"", expected); // TEST 20: composite term query with boolean operator AND // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain terms 'accounts' @@ -826,15 +737,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { expected.add(new Serializable[]{1001, "Accounts, Banking, Finance, Insurance"}); expected.add(new Serializable[]{1002, "Accounts, Finance, Banking, Insurance"}); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'accounts AND banking AND " - + "insurance') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'accounts AND banking AND insurance') LIMIT " - + "50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("accounts AND banking AND insurance", expected); // TEST 21: composite phrase and term query using boolean operator AND. // Search in SKILLS_TEXT_COL column to look for documents where each document MUST contain ALL the following skills: @@ -853,14 +756,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest { + " concurrency, multi-threading, C++, CPU processing, Java" }); - query = - "SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND " - + "Java AND C++') LIMIT 50000"; - testTextSearchSelectQueryHelper(query, expected.size(), false, expected); - query = - "SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"distributed systems\" AND Java AND C++') " - + "LIMIT 50000"; - testTextSearchAggregationQueryHelper(query, expected.size()); + testSkillsColumn("\"distributed systems\" AND Java AND C++", expected); // test for the index configured to use AND as the default // conjunction operator @@ -1802,6 +1698,22 @@ public class TextSearchQueriesTest extends BaseQueriesTest { Assert.assertEquals(expectedCount, count); } + private void testSkillsColumn(String searchQuery, List<Serializable[]> expected) + throws Exception { + for (String skillColumn : Arrays.asList(SKILLS_TEXT_COL_NAME, SKILLS_TEXT_COL_DICT_NAME, + SKILLS_TEXT_COL_MULTI_TERM_NAME, SKILLS_TEXT_NO_RAW_NAME, SKILLS_TEXT_MV_COL_NAME, + SKILLS_TEXT_MV_COL_DICT_NAME)) { + String query = + String.format("SELECT INT_COL, SKILLS_TEXT_COL FROM MyTable WHERE TEXT_MATCH(%s, '%s') LIMIT 50000", + skillColumn, searchQuery); + testTextSearchSelectQueryHelper(query, expected.size(), false, expected); + + query = String.format("SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(%s, '%s') LIMIT 50000", skillColumn, + searchQuery); + testTextSearchAggregationQueryHelper(query, expected.size()); + } + } + @Test public void testInterSegment() { String query = diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index 0ea31d5..d2ac484 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -166,9 +166,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); for (String columnName : h3IndexConfigs.keySet()) { - Preconditions - .checkState(schema.hasColumn(columnName), "Cannot create H3 index for column: %s because it is not in schema", - columnName); + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create H3 index for column: %s because it is not in schema", columnName); } // Initialize creators for dictionary, forward index and inverted index @@ -206,8 +205,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { int cardinality = indexCreationInfo.getDistinctValueCount(); if (fieldSpec.isSingleValueField()) { if (indexCreationInfo.isSorted()) { - _forwardIndexCreatorMap - .put(columnName, new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality)); + _forwardIndexCreatorMap.put(columnName, + new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality)); } else { _forwardIndexCreatorMap.put(columnName, new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs)); @@ -221,8 +220,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // Initialize inverted index creator; skip creating inverted index if sorted if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) { if (segmentCreationSpec.isOnHeap()) { - _invertedIndexCreatorMap - .put(columnName, new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality)); + _invertedIndexCreatorMap.put(columnName, + new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality)); } else { _invertedIndexCreatorMap.put(columnName, new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs, @@ -254,21 +253,19 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (textIndexColumns.contains(columnName)) { // Initialize text index creator - Preconditions.checkState(fieldSpec.isSingleValueField(), - "Text index is currently only supported on single-value columns"); - Preconditions - .checkState(storedType == DataType.STRING, "Text index is currently only supported on STRING type columns"); - _textIndexCreatorMap - .put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */)); + Preconditions.checkState(storedType == DataType.STRING, + "Text index is currently only supported on STRING type columns"); + _textIndexCreatorMap.put(columnName, + new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */)); } if (fstIndexColumns.contains(columnName)) { Preconditions.checkState(fieldSpec.isSingleValueField(), "FST index is currently only supported on single-value columns"); - Preconditions - .checkState(storedType == DataType.STRING, "FST index is currently only supported on STRING type columns"); - Preconditions - .checkState(dictEnabledColumn, "FST index is currently only supported on dictionary-encoded columns"); + Preconditions.checkState(storedType == DataType.STRING, + "FST index is currently only supported on STRING type columns"); + Preconditions.checkState(dictEnabledColumn, + "FST index is currently only supported on dictionary-encoded columns"); _fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName, (String[]) indexCreationInfo.getSortedUniqueElementsArray())); } @@ -276,8 +273,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { if (jsonIndexColumns.contains(columnName)) { Preconditions.checkState(fieldSpec.isSingleValueField(), "Json index is currently only supported on single-value columns"); - Preconditions - .checkState(storedType == DataType.STRING, "Json index is currently only supported on STRING columns"); + Preconditions.checkState(storedType == DataType.STRING, + "Json index is currently only supported on STRING columns"); JsonIndexCreator jsonIndexCreator = segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName) : new OffHeapJsonIndexCreator(_indexDir, columnName); @@ -286,8 +283,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName); if (h3IndexConfig != null) { - Preconditions - .checkState(fieldSpec.isSingleValueField(), "H3 index is currently only supported on single-value columns"); + Preconditions.checkState(fieldSpec.isSingleValueField(), + "H3 index is currently only supported on single-value columns"); Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns"); H3IndexResolution resolution = h3IndexConfig.getResolution(); GeoSpatialIndexCreator h3IndexCreator = @@ -308,8 +305,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { Map<String, Map<String, String>> columnProperties) { if (columnProperties != null) { Map<String, String> properties = columnProperties.get(columnName); - return properties != null && Boolean - .parseBoolean(properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY)); + return properties != null && Boolean.parseBoolean( + properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY)); } return false; } @@ -397,7 +394,22 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // text-index TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); if (textIndexCreator != null) { - textIndexCreator.add((String) columnValueToIndex); + if (fieldSpec.isSingleValueField()) { + textIndexCreator.add((String) columnValueToIndex); + } else { + Object[] values = (Object[]) columnValueToIndex; + int length = values.length; + if (values instanceof String[]) { + textIndexCreator.add((String[]) values, length); + } else { + String[] strings = new String[length]; + for (int i = 0; i < length; i++) { + strings[i] = (String) values[i]; + } + textIndexCreator.add(strings, length); + columnValueToIndex = strings; + } + } } if (fieldSpec.isSingleValueField()) { @@ -461,8 +473,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { //dictionary encoded int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); forwardIndexCreator.putDictIdMV(dictIds); - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap - .get(columnName); + DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); if (invertedIndexCreator != null) { invertedIndexCreator.add(dictIds, dictIds.length); } @@ -470,76 +481,69 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // for text index on raw columns, check the config to determine if actual raw value should // be stored or not if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - Object value = _columnProperties.get(columnName) - .get(FieldConfig.TEXT_INDEX_RAW_VALUE); + Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); if (value == null) { value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; } if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { - columnValueToIndex = new String[] {String.valueOf(value)}; + columnValueToIndex = new String[]{String.valueOf(value)}; } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { - columnValueToIndex = new byte[][] {String.valueOf(value).getBytes(UTF_8)}; + columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)}; } else { throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type"); } } + Object[] values = (Object[]) columnValueToIndex; + int length = values.length; switch (forwardIndexCreator.getValueType()) { case INT: - if (columnValueToIndex instanceof Object[]) { - int[] array = new int[((Object[]) columnValueToIndex).length]; - for (int i = 0; i < array.length; i++) { - array[i] = (Integer) ((Object[]) columnValueToIndex)[i]; - } - forwardIndexCreator.putIntMV(array); + int[] ints = new int[length]; + for (int i = 0; i < length; i++) { + ints[i] = (Integer) values[i]; } + forwardIndexCreator.putIntMV(ints); break; case LONG: - if (columnValueToIndex instanceof Object[]) { - long[] array = new long[((Object[]) columnValueToIndex).length]; - for (int i = 0; i < array.length; i++) { - array[i] = (Long) ((Object[]) columnValueToIndex)[i]; - } - forwardIndexCreator.putLongMV(array); + long[] longs = new long[length]; + for (int i = 0; i < length; i++) { + longs[i] = (Long) values[i]; } + forwardIndexCreator.putLongMV(longs); break; case FLOAT: - if (columnValueToIndex instanceof Object[]) { - float[] array = new float[((Object[]) columnValueToIndex).length]; - for (int i = 0; i < array.length; i++) { - array[i] = (Float) ((Object[]) columnValueToIndex)[i]; - } - forwardIndexCreator.putFloatMV(array); + float[] floats = new float[length]; + for (int i = 0; i < length; i++) { + floats[i] = (Float) values[i]; } + forwardIndexCreator.putFloatMV(floats); break; case DOUBLE: - if (columnValueToIndex instanceof Object[]) { - double[] array = new double[((Object[]) columnValueToIndex).length]; - for (int i = 0; i < array.length; i++) { - array[i] = (Double) ((Object[]) columnValueToIndex)[i]; - } - forwardIndexCreator.putDoubleMV(array); + double[] doubles = new double[length]; + for (int i = 0; i < length; i++) { + doubles[i] = (Double) values[i]; } + forwardIndexCreator.putDoubleMV(doubles); break; case STRING: - if (columnValueToIndex instanceof String[]) { - forwardIndexCreator.putStringMV((String[]) columnValueToIndex); - } else if (columnValueToIndex instanceof Object[]) { - String[] array = new String[((Object[]) columnValueToIndex).length]; - for (int i = 0; i < array.length; i++) { - array[i] = (String) ((Object[]) columnValueToIndex)[i]; + if (values instanceof String[]) { + forwardIndexCreator.putStringMV((String[]) values); + } else { + String[] strings = new String[length]; + for (int i = 0; i < length; i++) { + strings[i] = (String) values[i]; } - forwardIndexCreator.putStringMV(array); + forwardIndexCreator.putStringMV(strings); } break; case BYTES: - if (columnValueToIndex instanceof byte[][]) { - forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex); - } else if (columnValueToIndex instanceof Object[]) { - byte[][] array = new byte[((Object[]) columnValueToIndex).length][]; - for (int i = 0; i < array.length; i++) { - array[i] = (byte[]) ((Object[]) columnValueToIndex)[i]; + if (values instanceof byte[][]) { + forwardIndexCreator.putBytesMV((byte[][]) values); + } else { + byte[][] bytesArray = new byte[length][]; + for (int i = 0; i < length; i++) { + bytesArray[i] = (byte[]) values[i]; } - forwardIndexCreator.putBytesMV(array); + forwardIndexCreator.putBytesMV(bytesArray); } break; default: @@ -740,8 +744,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())); properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES), String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries())); - properties - .setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated())); + properties.setProperty(getKeyFor(column, IS_AUTO_GENERATED), + String.valueOf(columnIndexCreationInfo.isAutoGenerated())); PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction(); if (partitionFunction != null) { @@ -871,17 +875,15 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion, maxRowLengthInBytes, maxNumberOfMultiValueElements); default: - throw new UnsupportedOperationException( - "Data type not supported for raw indexing: " + dataType); + throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); } } @Override public void close() throws IOException { - FileUtils.close(Iterables - .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(), - _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), _jsonIndexCreatorMap.values(), - _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values())); + FileUtils.close(Iterables.concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), + _invertedIndexCreatorMap.values(), _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), + _jsonIndexCreatorMap.values(), _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values())); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java index 5e78289..bb772ac 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java @@ -78,6 +78,11 @@ public class LuceneFSTIndexCreator implements TextIndexCreator { } @Override + public void add(String[] documents, int length) { + throw new UnsupportedOperationException("Multiple values not supported"); + } + + @Override public void seal() throws IOException { LOGGER.info("Sealing FST index: " + _fstIndexFile.getAbsolutePath()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java index aa4e9b5..a46c481 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/LuceneTextIndexCreator.java @@ -121,6 +121,26 @@ public class LuceneTextIndexCreator implements TextIndexCreator { } @Override + public void add(String[] documents, int length) { + Document docToIndex = new Document(); + + // Whenever multiple fields with the same name appear in one document, both the + // inverted index and term vectors will logically append the tokens of the + // field to one another, in the order the fields were added. + for (int i = 0; i < length; i++) { + docToIndex.add(new TextField(_textColumn, documents[i], Field.Store.NO)); + } + docToIndex.add(new StoredField(LUCENE_INDEX_DOC_ID_COLUMN_NAME, _nextDocId++)); + + try { + _indexWriter.addDocument(docToIndex); + } catch (Exception e) { + throw new RuntimeException( + "Caught exception while adding a new document to the Lucene index for column: " + _textColumn, e); + } + } + + @Override public void seal() { try { // Do this one time operation of combining the multiple lucene index files (if any) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java index a5b0843..b7083d0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java @@ -29,7 +29,9 @@ import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexCo import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl; import org.apache.pinot.segment.spi.ColumnMetadata; @@ -70,9 +72,14 @@ public class LoaderUtils { columnMetadata.getTotalNumberOfEntries(), columnMetadata.getBitsPerElement()); } } else { - DataType dataType = columnMetadata.getDataType(); - return dataType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, dataType) - : new VarByteChunkSVForwardIndexReader(dataBuffer, dataType); + DataType storedType = columnMetadata.getDataType().getStoredType(); + if (columnMetadata.isSingleValue()) { + return storedType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType) + : new VarByteChunkSVForwardIndexReader(dataBuffer, storedType); + } else { + return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType) + : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType); + } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java index 53134cb..c12308e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java @@ -37,6 +37,7 @@ package org.apache.pinot.segment.local.segment.index.loader.invertedindex; import java.io.File; +import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.commons.configuration.PropertiesConfiguration; @@ -45,10 +46,9 @@ import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; import org.apache.pinot.segment.spi.index.creator.TextIndexType; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.reader.Dictionary; @@ -123,9 +123,8 @@ public class TextIndexHandler implements IndexHandler { } /** - * Right now the text index is supported on RAW and dictionary encoded - * single-value STRING columns. Later we can add support for text index - * on multi-value columns and BYTE type columns + * Right now the text index is supported on STRING columns. + * Later we can add support for text index on BYTES columns * @param columnMetadata metadata for column */ private void checkUnsupportedOperationsForTextIndex(ColumnMetadata columnMetadata) { @@ -133,10 +132,6 @@ public class TextIndexHandler implements IndexHandler { if (columnMetadata.getDataType() != DataType.STRING) { throw new UnsupportedOperationException("Text index is currently only supported on STRING columns: " + column); } - if (!columnMetadata.isSingleValue()) { - throw new UnsupportedOperationException( - "Text index is currently not supported on multi-value columns: " + column); - } } private void createTextIndexForColumn(ColumnMetadata columnMetadata) @@ -156,24 +151,10 @@ public class TextIndexHandler implements IndexHandler { try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) { - if (!hasDictionary) { - // text index on raw column, just read the raw forward index - VarByteChunkSVForwardIndexReader rawIndexReader = (VarByteChunkSVForwardIndexReader) forwardIndexReader; - BaseChunkSVForwardIndexReader.ChunkReaderContext chunkReaderContext = - (BaseChunkSVForwardIndexReader.ChunkReaderContext) readerContext; - for (int docId = 0; docId < numDocs; docId++) { - textIndexCreator.add(rawIndexReader.getString(docId, chunkReaderContext)); - } + if (columnMetadata.isSingleValue()) { + processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata); } else { - // text index on dictionary encoded SV column - // read forward index to get dictId - // read the raw value from dictionary using dictId - try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) { - for (int docId = 0; docId < numDocs; docId++) { - int dictId = forwardIndexReader.getDictId(docId, readerContext); - textIndexCreator.add(dictionary.getStringValue(dictId)); - } - } + processMVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata); } textIndexCreator.seal(); } @@ -183,4 +164,56 @@ public class TextIndexHandler implements IndexHandler { properties.setProperty(getKeyFor(column, TEXT_INDEX_TYPE), TextIndexType.LUCENE.name()); properties.save(); } + + private void processSVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader, + ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs, + ColumnMetadata columnMetadata) + throws IOException { + if (!hasDictionary) { + // text index on raw column, just read the raw forward index + for (int docId = 0; docId < numDocs; docId++) { + textIndexCreator.add(forwardIndexReader.getString(docId, readerContext)); + } + } else { + // text index on dictionary encoded SV column + // read forward index to get dictId + // read the raw value from dictionary using dictId + try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) { + for (int docId = 0; docId < numDocs; docId++) { + int dictId = forwardIndexReader.getDictId(docId, readerContext); + textIndexCreator.add(dictionary.getStringValue(dictId)); + } + } + } + } + + private void processMVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader, + ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs, + ColumnMetadata columnMetadata) + throws IOException { + if (!hasDictionary) { + // text index on raw column, just read the raw forward index + String[] valueBuffer = new String[columnMetadata.getMaxNumberOfMultiValues()]; + for (int docId = 0; docId < numDocs; docId++) { + int length = forwardIndexReader.getStringMV(docId, valueBuffer, readerContext); + textIndexCreator.add(valueBuffer, length); + } + } else { + // text index on dictionary encoded MV column + // read forward index to get dictId + // read the raw value from dictionary using dictId + try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) { + int maxNumEntries = columnMetadata.getMaxNumberOfMultiValues(); + int[] dictIdBuffer = new int[maxNumEntries]; + String[] valueBuffer = new String[maxNumEntries]; + for (int docId = 0; docId < numDocs; docId++) { + int length = forwardIndexReader.getDictIdMV(docId, dictIdBuffer, readerContext); + for (int i = 0; i < length; i++) { + valueBuffer[i] = dictionary.getStringValue(dictIdBuffer[i]); + } + textIndexCreator.add(valueBuffer, length); + } + } + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java index a97bb80..678105a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/nativefst/NativeFSTIndexCreator.java @@ -66,6 +66,11 @@ public class NativeFSTIndexCreator implements TextIndexCreator { } @Override + public void add(String[] document, int length) { + throw new UnsupportedOperationException("Multiple values not supported"); + } + + @Override public void seal() throws IOException { LOGGER.info("Sealing FST index: " + _fstIndexFile.getAbsolutePath()); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 2a16478..8a2fb82 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -85,6 +85,8 @@ public class SegmentPreProcessorTest { private static final String EXISTING_STRING_COL_DICT = "column5"; private static final String NEWLY_ADDED_STRING_COL_RAW = "newTextColRaw"; private static final String NEWLY_ADDED_STRING_COL_DICT = "newTextColDict"; + private static final String NEWLY_ADDED_STRING_MV_COL_RAW = "newTextMVColRaw"; + private static final String NEWLY_ADDED_STRING_MV_COL_DICT = "newTextMVColDict"; // For create fst index tests private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict"; @@ -214,8 +216,10 @@ public class SegmentPreProcessorTest { throws Exception { Set<String> textIndexColumns = new HashSet<>(); textIndexColumns.add(NEWLY_ADDED_STRING_COL_RAW); + textIndexColumns.add(NEWLY_ADDED_STRING_MV_COL_RAW); _indexLoadingConfig.setTextIndexColumns(textIndexColumns); _indexLoadingConfig.getNoDictionaryColumns().add(NEWLY_ADDED_STRING_COL_RAW); + _indexLoadingConfig.getNoDictionaryColumns().add(NEWLY_ADDED_STRING_MV_COL_RAW); // Create a segment in V3, add a new raw column with text index enabled constructV3Segment(); @@ -224,6 +228,8 @@ public class SegmentPreProcessorTest { // should be null since column does not exist in the schema assertNull(columnMetadata); checkTextIndexCreation(NEWLY_ADDED_STRING_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, true, 4); + checkTextIndexCreation(NEWLY_ADDED_STRING_MV_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, false, 4, false, + 1); // Create a segment in V1, add a new raw column with text index enabled constructV1Segment(); @@ -303,6 +309,7 @@ public class SegmentPreProcessorTest { throws Exception { Set<String> textIndexColumns = new HashSet<>(); textIndexColumns.add(NEWLY_ADDED_STRING_COL_DICT); + textIndexColumns.add(NEWLY_ADDED_STRING_MV_COL_DICT); _indexLoadingConfig.setTextIndexColumns(textIndexColumns); // Create a segment in V3, add a new dict encoded column with text index enabled @@ -313,6 +320,9 @@ public class SegmentPreProcessorTest { assertNull(columnMetadata); checkTextIndexCreation(NEWLY_ADDED_STRING_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, true, 4); + checkTextIndexCreation(NEWLY_ADDED_STRING_MV_COL_DICT, 1, 1, _newColumnsSchemaWithText, true, true, false, 4, false, + 1); + // Create a segment in V1, add a new dict encoded column with text index enabled constructV1Segment(); segmentMetadata = new SegmentMetadataImpl(_indexDir); @@ -385,18 +395,27 @@ public class SegmentPreProcessorTest { boolean isSorted, int dictionaryElementSize) throws Exception { checkIndexCreation(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true, isSorted, - dictionaryElementSize); + dictionaryElementSize, true, 0); } private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize) throws Exception { checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, - isSorted, dictionaryElementSize); + isSorted, dictionaryElementSize, true, 0); + } + + private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated, + boolean hasDictionary, boolean isSorted, int dictionaryElementSize, boolean isSingleValue, + int maxNumberOfMultiValues) + throws Exception { + checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, + isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues); } private void checkIndexCreation(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema, - boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize) + boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize, + boolean isSingleValued, int maxNumberOfMultiValues) throws Exception { try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getLocalSegmentDirectoryLoader() @@ -405,14 +424,14 @@ public class SegmentPreProcessorTest { processor.process(); SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir); ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); - assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, true)); + assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued)); assertEquals(columnMetadata.getCardinality(), cardinality); assertEquals(columnMetadata.getTotalDocs(), 100000); assertEquals(columnMetadata.getBitsPerElement(), bits); assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize); assertEquals(columnMetadata.isSorted(), isSorted); assertEquals(columnMetadata.hasDictionary(), hasDictionary); - assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 0); + assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues); assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000); assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated); @@ -761,8 +780,8 @@ public class SegmentPreProcessorTest { Iterator<String> keys = configuration.getKeys(); while (keys.hasNext()) { String key = keys.next(); - if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key - .endsWith(V1Constants.MetadataKeys.Column.MAX_VALUE)) { + if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith( + V1Constants.MetadataKeys.Column.MAX_VALUE)) { configuration.clearProperty(key); } } @@ -970,8 +989,8 @@ public class SegmentPreProcessorTest { assertNotNull(segmentMetadata.getColumnMetadataFor("newJsonCol")); _indexLoadingConfig = new IndexLoadingConfig(); - _indexLoadingConfig - .setH3IndexConfigs(ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))); + _indexLoadingConfig.setH3IndexConfigs( + ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))); _indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol"))); // V1 use separate file for each column index. @@ -1025,8 +1044,8 @@ public class SegmentPreProcessorTest { long initFileSize = singleFileIndex.length(); _indexLoadingConfig = new IndexLoadingConfig(); - _indexLoadingConfig - .setH3IndexConfigs(ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))); + _indexLoadingConfig.setH3IndexConfigs( + ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))); _indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol"))); // Create H3 and Json indices. diff --git a/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json index b2975d4..dda33ba 100644 --- a/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json +++ b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithText.json @@ -30,6 +30,16 @@ "dataType": "STRING" }, { + "name": "newTextMVColRaw", + "dataType": "STRING", + "singleValueField": false + }, + { + "name": "newTextMVColDict", + "dataType": "STRING", + "singleValueField": false + }, + { "name": "column6", "dataType": "INT", "singleValueField": false diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java index 4a9d6cf..bad466e 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java @@ -33,6 +33,11 @@ public interface TextIndexCreator extends Closeable { void add(String document); /** + * Adds a set of documents to the index + */ + void add(String[] document, int length); + + /** * Seals the index and flushes it to disk. */ void seal() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org