This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6dd84b52d52 Add bad data handling for some IndexCreator::add() functions to skip record or add dummy record (#16094) 6dd84b52d52 is described below commit 6dd84b52d528526a45f783aa34414be5a4dee71f Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Mon Jul 14 17:23:44 2025 -0700 Add bad data handling for some IndexCreator::add() functions to skip record or add dummy record (#16094) * Add bad data handling for some IndexCreator::add() functions to skip record or add dummy record * Add tableNameWithType to update the metric with table name included * Address review comments * Use continueOnError flag for FST and native text index creators * Compared returned flattened record to default JsonUtils.SKIPPED_FLATTENED_RECORD and update metric if they are equal * Use _continueOnError even for JSON index * Update H3 index creator to use continueOnError flag * Address review comments --- .../accounting/ResourceManagerAccountingTest.java | 3 +- .../impl/inv/geospatial/BaseH3IndexCreator.java | 19 ++-- .../impl/inv/geospatial/OffHeapH3IndexCreator.java | 5 +- .../impl/inv/geospatial/OnHeapH3IndexCreator.java | 5 +- .../impl/inv/json/BaseJsonIndexCreator.java | 51 +++++++++- .../impl/inv/json/OffHeapJsonIndexCreator.java | 5 +- .../impl/inv/json/OnHeapJsonIndexCreator.java | 5 +- .../impl/inv/text/LuceneFSTIndexCreator.java | 55 +++++++++-- .../creator/impl/text/NativeTextIndexCreator.java | 39 +++++++- .../local/segment/index/h3/H3IndexType.java | 4 +- .../local/segment/index/json/JsonIndexType.java | 5 +- .../local/segment/index/text/TextIndexType.java | 3 +- .../pinot/segment/local/utils/MetricUtils.java | 37 +++---- .../segment/local/segment/index/H3IndexTest.java | 64 +++++++++++- .../segment/local/segment/index/JsonIndexTest.java | 89 +++++++++++++++-- .../index/creator/HnswVectorIndexCreatorTest.java | 95 ++++++++++++++++++ .../index/creator/LuceneFSTIndexCreatorTest.java | 72 ++++++++++++-- .../index/creator/LuceneTextIndexCreatorTest.java | 107 +++++++++++++++++++++ .../index/creator/NativeTextIndexCreatorTest.java | 87 ++++++++++++++++- .../segment/store/FilePerIndexDirectoryTest.java | 2 +- .../segment/spi/index/creator/FSTIndexCreator.java | 3 +- .../spi/index/creator/JsonIndexCreator.java | 9 +- .../java/org/apache/pinot/spi/utils/JsonUtils.java | 6 +- 23 files changed, 680 insertions(+), 90 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java index b72cc621a97..38ab43ba220 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java @@ -520,7 +520,8 @@ public class ResourceManagerAccountingTest { File indexDir = new File(FileUtils.getTempDirectory(), "testJsonIndexExtractMapOOM"); FileUtils.forceMkdir(indexDir); String colName = "col"; - try (JsonIndexCreator offHeapIndexCreator = new OffHeapJsonIndexCreator(indexDir, colName, new JsonIndexConfig()); + try (JsonIndexCreator offHeapIndexCreator = new OffHeapJsonIndexCreator(indexDir, colName, "myTable_OFFLINE", + false, new JsonIndexConfig()); MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new JsonIndexConfig(), "table__0__1", "col")) { // build json indexes for (int i = 0; i < 1000000; i++) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java index b3861073619..888d5cbb004 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial; +import com.google.common.base.Preconditions; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.File; @@ -26,16 +27,14 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.Locale; import java.util.Map; import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.metrics.ServerMeter; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.segment.index.h3.H3IndexType; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.local.utils.H3Utils; +import org.apache.pinot.segment.local.utils.MetricUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; import org.apache.pinot.segment.spi.index.reader.H3IndexResolution; @@ -72,6 +71,7 @@ public abstract class BaseH3IndexCreator implements GeoSpatialIndexCreator { static final String BITMAP_VALUE_FILE_NAME = "bitmap.value.buf"; final String _tableNameWithType; + final boolean _continueOnError; final File _indexFile; final File _tempDir; final File _dictionaryFile; @@ -88,9 +88,11 @@ public abstract class BaseH3IndexCreator implements GeoSpatialIndexCreator { int _nextDocId; - BaseH3IndexCreator(File indexDir, String columnName, String tableNameWithType, H3IndexResolution resolution) + BaseH3IndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + H3IndexResolution resolution) throws IOException { _tableNameWithType = tableNameWithType; + _continueOnError = continueOnError; _indexFile = new File(indexDir, columnName + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION); _tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX); if (_tempDir.exists()) { @@ -116,13 +118,14 @@ public abstract class BaseH3IndexCreator implements GeoSpatialIndexCreator { @Override public void add(@Nullable Geometry geometry) throws IOException { - if (geometry == null || !(geometry instanceof Point)) { - String metricKeyName = - _tableNameWithType + "-" + H3IndexType.INDEX_DISPLAY_NAME.toUpperCase(Locale.US) + "-indexingError"; - ServerMetrics.get().addMeteredTableValue(metricKeyName, ServerMeter.INDEXING_FAILURES, 1); + if (_continueOnError && !(geometry instanceof Point)) { + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, H3IndexType.INDEX_DISPLAY_NAME); _nextDocId++; return; } + Preconditions.checkState(geometry != null, "Null geometry record found and continueOnError is disabled"); + Preconditions.checkState(geometry instanceof Point, "H3 index can only be applied to Point, got: %s", + geometry.getGeometryType()); Coordinate coordinate = geometry.getCoordinate(); // TODO: support multiple resolutions long h3Id = H3Utils.H3_CORE.latLngToCell(coordinate.y, coordinate.x, _lowestResolution); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java index 35240582ca0..72fb3ddf066 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java @@ -61,9 +61,10 @@ public class OffHeapH3IndexCreator extends BaseH3IndexCreator { private long _postingListChunkOffset; - public OffHeapH3IndexCreator(File indexDir, String columnName, String tableNameWithType, H3IndexResolution resolution) + public OffHeapH3IndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + H3IndexResolution resolution) throws IOException { - super(indexDir, columnName, tableNameWithType, resolution); + super(indexDir, columnName, tableNameWithType, continueOnError, resolution); _postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME); _postingListOutputStream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(_postingListFile))); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java index c9583a45d75..c02b83e1a17 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java @@ -34,9 +34,10 @@ import org.roaringbitmap.RoaringBitmapWriter; */ public class OnHeapH3IndexCreator extends BaseH3IndexCreator { - public OnHeapH3IndexCreator(File indexDir, String columnName, String tableNameWithType, H3IndexResolution resolution) + public OnHeapH3IndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + H3IndexResolution resolution) throws IOException { - super(indexDir, columnName, tableNameWithType, resolution); + super(indexDir, columnName, tableNameWithType, continueOnError, resolution); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java index 5c3409fe45f..91479775b9f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.inv.json; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; @@ -31,6 +32,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.index.json.JsonIndexType; +import org.apache.pinot.segment.local.utils.MetricUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; import org.apache.pinot.segment.spi.memory.CleanerUtil; @@ -62,6 +65,8 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator { static final String DICTIONARY_FILE_NAME = "dictionary.buf"; static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf"; + final String _tableNameWithType; + final boolean _continueOnError; final JsonIndexConfig _jsonIndexConfig; final File _indexFile; final File _tempDir; @@ -74,8 +79,11 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator { int _nextFlattenedDocId; int _maxValueLength; - BaseJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig) + BaseJsonIndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + JsonIndexConfig jsonIndexConfig) throws IOException { + _tableNameWithType = tableNameWithType; + _continueOnError = continueOnError; _jsonIndexConfig = jsonIndexConfig; _indexFile = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); _tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX); @@ -91,7 +99,46 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator { @Override public void add(String jsonString) throws IOException { - addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig)); + List<Map<String, String>> flattenedRecord; + try { + flattenedRecord = JsonUtils.flatten(jsonString, _jsonIndexConfig); + if (flattenedRecord == JsonUtils.SKIPPED_FLATTENED_RECORD) { + // The default SKIPPED_FLATTENED_RECORD was returned, this can only happen if the original record could not be + // flattened, update the metric + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, JsonIndexType.INDEX_DISPLAY_NAME); + } + } catch (Exception e) { + if (_continueOnError) { + // Caught exception while trying to add, update metric and add a default SKIPPED_FLATTENED_RECORD + // This check is needed in the case where `_jsonIndexConfig.getSkipInvalidJson()` is false, + // but _continueOnError is true + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, JsonIndexType.INDEX_DISPLAY_NAME); + flattenedRecord = JsonUtils.SKIPPED_FLATTENED_RECORD; + } else { + throw e; + } + } + addFlattenedRecords(flattenedRecord); + } + + @Override + public void add(Map value) + throws IOException { + String valueToAdd; + try { + // TODO: Avoid this ser/de from map -> string -> json node + valueToAdd = JsonUtils.objectToString(value); + } catch (JsonProcessingException e) { + if (_jsonIndexConfig.getSkipInvalidJson() || _continueOnError) { + // Caught exception while trying to add, update metric and add a default SKIPPED_FLATTENED_RECORD + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, JsonIndexType.INDEX_DISPLAY_NAME); + addFlattenedRecords(JsonUtils.SKIPPED_FLATTENED_RECORD); + return; + } else { + throw e; + } + } + add(valueToAdd); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java index 30c5d2b63b3..7b99072124d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java @@ -70,9 +70,10 @@ public class OffHeapJsonIndexCreator extends BaseJsonIndexCreator { private int _numPostingListsInLastChunk; private int _numPostingLists; - public OffHeapJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig) + public OffHeapJsonIndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + JsonIndexConfig jsonIndexConfig) throws IOException { - super(indexDir, columnName, jsonIndexConfig); + super(indexDir, columnName, tableNameWithType, continueOnError, jsonIndexConfig); _postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME); _postingListOutputStream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(_postingListFile))); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java index bb45728c3f4..f09b8b717c5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java @@ -39,9 +39,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; */ public class OnHeapJsonIndexCreator extends BaseJsonIndexCreator { - public OnHeapJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig jsonIndexConfig) + public OnHeapJsonIndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + JsonIndexConfig jsonIndexConfig) throws IOException { - super(indexDir, columnName, jsonIndexConfig); + super(indexDir, columnName, tableNameWithType, continueOnError, jsonIndexConfig); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java index c490a79db10..c410003d1b0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.inv.text; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.fst.FST; +import org.apache.pinot.segment.local.segment.index.fst.FstIndexType; +import org.apache.pinot.segment.local.utils.MetricUtils; import org.apache.pinot.segment.local.utils.fst.FSTBuilder; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.IndexCreationContext; @@ -40,6 +43,9 @@ import org.slf4j.LoggerFactory; public class LuceneFSTIndexCreator implements FSTIndexCreator { private static final Logger LOGGER = LoggerFactory.getLogger(LuceneFSTIndexCreator.class); private final File _fstIndexFile; + private final String _columnName; + private final String _tableNameWithType; + private final boolean _continueOnError; private final FSTBuilder _fstBuilder; Integer _dictId; @@ -50,36 +56,69 @@ public class LuceneFSTIndexCreator implements FSTIndexCreator { * * @param indexDir Index directory * @param columnName Column name for which index is being created + * @param tableNameWithType table name with type + * @param continueOnError if true, don't throw exception on add() failures * @param sortedEntries Sorted entries of the unique values of the column. * @throws IOException */ - public LuceneFSTIndexCreator(File indexDir, String columnName, String[] sortedEntries) + public LuceneFSTIndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + String[] sortedEntries) throws IOException { + this(indexDir, columnName, tableNameWithType, continueOnError, sortedEntries, new FSTBuilder()); + } + + @VisibleForTesting + public LuceneFSTIndexCreator(File indexDir, String columnName, String tableNameWithType, boolean continueOnError, + String[] sortedEntries, FSTBuilder fstBuilder) + throws IOException { + _columnName = columnName; + _tableNameWithType = tableNameWithType; + _continueOnError = continueOnError; _fstIndexFile = new File(indexDir, columnName + V1Constants.Indexes.LUCENE_V912_FST_INDEX_FILE_EXTENSION); - _fstBuilder = new FSTBuilder(); + _fstBuilder = fstBuilder; _dictId = 0; if (sortedEntries != null) { for (_dictId = 0; _dictId < sortedEntries.length; _dictId++) { - _fstBuilder.addEntry(sortedEntries[_dictId], _dictId); + try { + _fstBuilder.addEntry(sortedEntries[_dictId], _dictId); + } catch (Exception ex) { + if (_continueOnError) { + // Caught exception while trying to add, update metric and skip the document + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, FstIndexType.INDEX_DISPLAY_NAME); + } else { + LOGGER.error("Caught exception while trying to add to FST index for table: {}, column: {}", + tableNameWithType, columnName, ex); + throw ex; + } + } } } } public LuceneFSTIndexCreator(IndexCreationContext context) throws IOException { - this(context.getIndexDir(), context.getFieldSpec().getName(), (String[]) context.getSortedUniqueElementsArray()); + this(context.getIndexDir(), context.getFieldSpec().getName(), context.getTableNameWithType(), + context.isContinueOnError(), (String[]) context.getSortedUniqueElementsArray()); } // Expects dictionary entries in sorted order. @Override - public void add(String document) { + public void add(String document) + throws IOException { try { _fstBuilder.addEntry(document, _dictId); - _dictId++; - } catch (IOException ex) { - throw new RuntimeException("Unable to load the schema file", ex); + } catch (Exception ex) { + if (_continueOnError) { + // Caught exception while trying to add, update metric and skip the document + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, FstIndexType.INDEX_DISPLAY_NAME); + } else { + LOGGER.error("Caught exception while trying to add to FST index for table: {}, column: {}", + _tableNameWithType, _columnName, ex); + throw ex; + } } + _dictId++; } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java index 7ef4d252143..dab54fb6408 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java @@ -35,6 +35,8 @@ import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter; import org.apache.pinot.segment.local.segment.index.text.AbstractTextIndexCreator; import org.apache.pinot.segment.local.segment.index.text.CaseAwareStandardAnalyzer; +import org.apache.pinot.segment.local.segment.index.text.TextIndexType; +import org.apache.pinot.segment.local.utils.MetricUtils; import org.apache.pinot.segment.local.utils.nativefst.FST; import org.apache.pinot.segment.local.utils.nativefst.FSTHeader; import org.apache.pinot.segment.local.utils.nativefst.builder.FSTBuilder; @@ -42,11 +44,14 @@ import org.apache.pinot.segment.spi.V1Constants; import org.roaringbitmap.Container; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.RoaringBitmapWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.nio.charset.StandardCharsets.UTF_8; public class NativeTextIndexCreator extends AbstractTextIndexCreator { + private static final Logger LOGGER = LoggerFactory.getLogger(NativeTextIndexCreator.class); private static final String TEMP_DIR_SUFFIX = ".nativetext.idx.tmp"; private static final String FST_FILE_NAME = "native.fst"; private static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf"; @@ -62,6 +67,8 @@ public class NativeTextIndexCreator extends AbstractTextIndexCreator { public static final int VERSION = 1; private final String _columnName; + private final String _tableNameWithType; + private final boolean _continueOnError; private final FSTBuilder _fstBuilder; private final File _indexFile; private final File _tempDir; @@ -74,9 +81,11 @@ public class NativeTextIndexCreator extends AbstractTextIndexCreator { private int _fstDataSize; private int _numBitMaps; - public NativeTextIndexCreator(String column, File indexDir) + public NativeTextIndexCreator(String column, String tableNameWithType, boolean continueOnError, File indexDir) throws IOException { _columnName = column; + _tableNameWithType = tableNameWithType; + _continueOnError = continueOnError; _fstBuilder = new FSTBuilder(); _indexFile = new File(indexDir, column + V1Constants.Indexes.NATIVE_TEXT_INDEX_FILE_EXTENSION); _tempDir = new File(indexDir, column + TEMP_DIR_SUFFIX); @@ -92,14 +101,36 @@ public class NativeTextIndexCreator extends AbstractTextIndexCreator { @Override public void add(String document) { - addHelper(document); + try { + addHelper(document); + } catch (RuntimeException e) { + if (_continueOnError) { + // Caught exception while trying to add, update metric and skip the document + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, TextIndexType.INDEX_DISPLAY_NAME); + } else { + LOGGER.error("Caught exception while trying to add to native text index for table: {}, column: {}", + _tableNameWithType, _columnName, e); + throw e; + } + } _nextDocId++; } @Override public void add(String[] documents, int length) { - for (int i = 0; i < length; i++) { - addHelper(documents[i]); + try { + for (int i = 0; i < length; i++) { + addHelper(documents[i]); + } + } catch (RuntimeException e) { + if (_continueOnError) { + // Caught exception while trying to add, update metric and skip the document + MetricUtils.updateIndexingErrorMetric(_tableNameWithType, TextIndexType.INDEX_DISPLAY_NAME); + } else { + LOGGER.error("Caught exception while trying to add to native text index for table: {}, column: {}", + _tableNameWithType, _columnName, e); + throw e; + } } _nextDocId++; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java index 5d0996ef746..194ae655f58 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java @@ -109,9 +109,9 @@ public class H3IndexType extends AbstractIndexType<H3IndexConfig, H3IndexReader, H3IndexResolution resolution = Objects.requireNonNull(indexConfig).getResolution(); return context.isOnHeap() ? new OnHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), - context.getTableNameWithType(), resolution) + context.getTableNameWithType(), context.isContinueOnError(), resolution) : new OffHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), - context.getTableNameWithType(), resolution); + context.getTableNameWithType(), context.isContinueOnError(), resolution); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java index 4943e7d0c3f..17ded67a0e0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java @@ -115,8 +115,9 @@ public class JsonIndexType extends AbstractIndexType<JsonIndexConfig, JsonIndexR Preconditions.checkState(storedType == DataType.STRING || storedType == DataType.MAP, "Json index is currently only supported on STRING columns"); return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), - indexConfig) - : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), indexConfig); + context.getTableNameWithType(), context.isContinueOnError(), indexConfig) + : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getTableNameWithType(), context.isContinueOnError(), indexConfig); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java index dfe8616bd05..3c36ae2ac63 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java @@ -116,7 +116,8 @@ public class TextIndexType extends AbstractIndexType<TextIndexConfig, TextIndexR Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING, "Text index is currently only supported on STRING type columns"); if (indexConfig.getFstType() == FSTType.NATIVE) { - return new NativeTextIndexCreator(context.getFieldSpec().getName(), context.getIndexDir()); + return new NativeTextIndexCreator(context.getFieldSpec().getName(), context.getTableNameWithType(), + context.isContinueOnError(), context.getIndexDir()); } else { return new LuceneTextIndexCreator(context, indexConfig); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java similarity index 54% copy from pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java index 218b4f3961e..29695833b86 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java @@ -16,34 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.spi.index.creator; +package org.apache.pinot.segment.local.utils; -import java.io.IOException; -import javax.annotation.Nullable; -import org.apache.pinot.segment.spi.index.IndexCreator; +import java.util.Locale; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; -public interface FSTIndexCreator extends IndexCreator { +/** + * Utils for metrics + */ +public class MetricUtils { - @Override - default void add(Object value, int dictId) - throws IOException { - // FST indexes should do nothing when called for each row - } - @Override - default void add(Object[] values, @Nullable int[] dictIds) - throws IOException { - // FST indexes should do nothing when called for each row + private MetricUtils() { } - /** - * Adds the next document. - */ - void add(String document); - - /** - * Adds a set of documents to the index - */ - void add(String[] document, int length); + public static void updateIndexingErrorMetric(String tableNameWithType, String indexDisplayName) { + String metricKeyName = + tableNameWithType + "-" + indexDisplayName.toUpperCase(Locale.US) + "-indexingError"; + ServerMetrics.get().addMeteredTableValue(metricKeyName, ServerMeter.INDEXING_FAILURES, 1); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java index 7493d43db6f..7e1bee3bf57 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java @@ -92,9 +92,9 @@ public class H3IndexTest implements PinotBuffersAfterMethodCheckRule { try (MutableH3Index mutableH3Index = new MutableH3Index(h3IndexResolution)) { try (GeoSpatialIndexCreator onHeapCreator = new OnHeapH3IndexCreator(TEMP_DIR, onHeapColumnName, - "myTable_OFFLINE", h3IndexResolution); + "myTable_OFFLINE", false, h3IndexResolution); GeoSpatialIndexCreator offHeapCreator = new OffHeapH3IndexCreator(TEMP_DIR, offHeapColumnName, - "myTable_OFFLINE", h3IndexResolution)) { + "myTable_OFFLINE", false, h3IndexResolution)) { int docId = 0; while (expectedCardinalities.size() < numUniqueH3Ids) { double longitude = RANDOM.nextDouble() * 360 - 180; @@ -134,7 +134,7 @@ public class H3IndexTest implements PinotBuffersAfterMethodCheckRule { int res = 5; H3IndexResolution resolution = new H3IndexResolution(Collections.singletonList(res)); - try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", + try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", true, resolution)) { Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 20)); creator.add(point); @@ -160,7 +160,7 @@ public class H3IndexTest implements PinotBuffersAfterMethodCheckRule { int res = 5; H3IndexResolution resolution = new H3IndexResolution(Collections.singletonList(res)); - try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", + try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", true, resolution)) { Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 20)); creator.add(point); @@ -186,7 +186,7 @@ public class H3IndexTest implements PinotBuffersAfterMethodCheckRule { int res = 5; H3IndexResolution resolution = new H3IndexResolution(Collections.singletonList(res)); - try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", + try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", true, resolution)) { Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 42)); creator.add(point); @@ -208,6 +208,60 @@ public class H3IndexTest implements PinotBuffersAfterMethodCheckRule { } } + @Test + public void testSkipInvalidGeometryContinueOnErrorFalse() + throws Exception { + String columnName = "skipInvalid"; + int res = 5; + H3IndexResolution resolution = new H3IndexResolution(Collections.singletonList(res)); + + try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", false, + resolution)) { + Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 20)); + creator.add(point); + + // Invalid serialized bytes should be skipped without throwing exception + Assert.assertThrows(IllegalStateException.class, () -> creator.add(new byte[]{1, 2, 3}, -1)); + } + } + + @Test + public void testSkipNullGeometryContinueOnErrorFalse() + throws Exception { + String columnName = "skipNull"; + int res = 5; + H3IndexResolution resolution = new H3IndexResolution(Collections.singletonList(res)); + + try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", false, + resolution)) { + Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 20)); + creator.add(point); + + // Explicit null geometry should also be skipped + Assert.assertThrows(IllegalStateException.class, () -> creator.add(null)); + } + } + + @Test + public void testSkipNonPointGeometryContinueOnErrorFalse() + throws Exception { + String columnName = "skipInvalidGeometryType"; + int res = 5; + H3IndexResolution resolution = new H3IndexResolution(Collections.singletonList(res)); + + try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, columnName, "myTable_OFFLINE", false, + resolution)) { + Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(10, 42)); + creator.add(point); + + // Explicit non-point geometry should also be skipped + Point[] points = new Point[1]; + points[0] = point; + MultiPoint multiPoint = GeometryUtils.GEOMETRY_FACTORY.createMultiPoint(points); + Assert.assertThrows(IllegalStateException.class, () -> creator.add(multiPoint)); + } + } + public static class ConfTest extends AbstractSerdeIndexContract { protected void assertEquals(H3IndexConfig expected) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java index 8f9b477ae20..b576a443942 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java @@ -397,9 +397,24 @@ public class JsonIndexTest implements PinotBuffersAfterMethodCheckRule { */ private void createIndex(boolean createOnHeap, JsonIndexConfig jsonIndexConfig, String[] records) throws IOException { + createIndex(createOnHeap, jsonIndexConfig, records, false); + } + + /** + * Creates a JSON index with the given config and adds the given records + * @param createOnHeap Whether to create an on-heap index + * @param jsonIndexConfig the JSON index config + * @param records the records to be added to the index + * @param continueOnError whether continueOnError should be enabled or disabled + * @throws IOException on error + */ + private void createIndex(boolean createOnHeap, JsonIndexConfig jsonIndexConfig, String[] records, boolean continueOnError) + throws IOException { try (JsonIndexCreator indexCreator = createOnHeap - ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME, jsonIndexConfig) - : new OffHeapJsonIndexCreator(INDEX_DIR, OFF_HEAP_COLUMN_NAME, jsonIndexConfig)) { + ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME, "myTable_OFFLINE", continueOnError, + jsonIndexConfig) + : new OffHeapJsonIndexCreator(INDEX_DIR, OFF_HEAP_COLUMN_NAME, "myTable_OFFLINE", continueOnError, + jsonIndexConfig)) { for (String record : records) { indexCreator.add(record); } @@ -453,7 +468,8 @@ public class JsonIndexTest implements PinotBuffersAfterMethodCheckRule { }; String colName = "col"; - try (JsonIndexCreator offHeapCreator = new OffHeapJsonIndexCreator(INDEX_DIR, colName, getIndexConfig()); + try (JsonIndexCreator offHeapCreator = new OffHeapJsonIndexCreator(INDEX_DIR, colName, "myTable_OFFLINE", + false, getIndexConfig()); MutableJsonIndexImpl mutableIndex = new MutableJsonIndexImpl(getIndexConfig(), "table__0__1", "col")) { for (String record : records) { offHeapCreator.add(record); @@ -522,7 +538,8 @@ public class JsonIndexTest implements PinotBuffersAfterMethodCheckRule { // @formatter: on String colName = "col"; - try (JsonIndexCreator offHeapCreator = new OffHeapJsonIndexCreator(INDEX_DIR, colName, getIndexConfig()); + try (JsonIndexCreator offHeapCreator = new OffHeapJsonIndexCreator(INDEX_DIR, colName, "myTable_OFFLINE", + false, getIndexConfig()); MutableJsonIndexImpl mutableIndex = new MutableJsonIndexImpl(getIndexConfig(), "table__0__1", "col")) { for (String record : records) { offHeapCreator.add(record); @@ -817,7 +834,7 @@ public class JsonIndexTest implements PinotBuffersAfterMethodCheckRule { } @Test - public void testSkipInvalidJsonEnable() throws Exception { + public void testSkipInvalidJsonEnableContinueOnErrorFalse() throws Exception { JsonIndexConfig jsonIndexConfig = getIndexConfig(); jsonIndexConfig.setSkipInvalidJson(true); // the braces don't match and cannot be parsed @@ -849,8 +866,68 @@ public class JsonIndexTest implements PinotBuffersAfterMethodCheckRule { } } + @Test + public void testSkipInvalidJsonEnableContinueOnErrorTrue() throws Exception { + JsonIndexConfig jsonIndexConfig = getIndexConfig(); + jsonIndexConfig.setSkipInvalidJson(true); + // the braces don't match and cannot be parsed + String[] records = {"{\"key1\":\"va\""}; + + createIndex(true, jsonIndexConfig, records, true); + File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + assertTrue(onHeapIndexFile.exists()); + + createIndex(false, jsonIndexConfig, records, true); + File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + assertTrue(offHeapIndexFile.exists()); + + try (PinotDataBuffer onHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile); + PinotDataBuffer offHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile); + JsonIndexReader onHeapReader = new ImmutableJsonIndexReader(onHeapBuffer, records.length); + JsonIndexReader offHeapReader = new ImmutableJsonIndexReader(offHeapBuffer, records.length); + MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(jsonIndexConfig, "table__0__1", "col")) { + for (String record : records) { + mutableJsonIndex.add(record); + } + Map<String, RoaringBitmap> onHeapRes = getMatchingDocsMap(onHeapReader, "$"); + Map<String, RoaringBitmap> offHeapRes = getMatchingDocsMap(offHeapReader, "$"); + Map<String, RoaringBitmap> mutableRes = mutableJsonIndex.getMatchingFlattenedDocsMap("$", null); + Object expectedRes = Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT, RoaringBitmap.bitmapOf(0)); + assertEquals(onHeapRes, expectedRes); + assertEquals(offHeapRes, expectedRes); + assertEquals(mutableRes, expectedRes); + } + } + + @Test + public void testSkipInvalidJsonDisabledContinueOnErrorTrue() throws Exception { + // by default, skipInvalidJson is disabled + JsonIndexConfig jsonIndexConfig = getIndexConfig(); + // the braces don't match and cannot be parsed + String[] records = {"{\"key1\":\"va\""}; + + createIndex(true, jsonIndexConfig, records, true); + File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + assertTrue(onHeapIndexFile.exists()); + + createIndex(false, jsonIndexConfig, records, true); + File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + assertTrue(offHeapIndexFile.exists()); + + try (PinotDataBuffer onHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile); + PinotDataBuffer offHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile); + JsonIndexReader onHeapReader = new ImmutableJsonIndexReader(onHeapBuffer, records.length); + JsonIndexReader offHeapReader = new ImmutableJsonIndexReader(offHeapBuffer, records.length)) { + Map<String, RoaringBitmap> onHeapRes = getMatchingDocsMap(onHeapReader, "$"); + Map<String, RoaringBitmap> offHeapRes = getMatchingDocsMap(offHeapReader, "$"); + Object expectedRes = Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT, RoaringBitmap.bitmapOf(0)); + assertEquals(onHeapRes, expectedRes); + assertEquals(offHeapRes, expectedRes); + } + } + @Test(expectedExceptions = JsonProcessingException.class) - public void testSkipInvalidJsonDisabled() throws Exception { + public void testSkipInvalidJsonDisabledContinueOnErrorFalse() throws Exception { // by default, skipInvalidJson is disabled JsonIndexConfig jsonIndexConfig = getIndexConfig(); // the braces don't match and cannot be parsed diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java new file mode 100644 index 00000000000..e1ed2625716 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.vector.HnswVectorIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.vector.HnswVectorIndexReader; +import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class HnswVectorIndexCreatorTest { + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), HnswVectorIndexCreatorTest.class.toString()); + private VectorIndexConfig _config; + + @BeforeMethod + public void setUp() + throws IOException { + FileUtils.forceMkdir(INDEX_DIR); + + Map<String, String> properties = new HashMap<>(); + + properties.put("vectorIndexType", "HNSW"); + properties.put("vectorDimension", "1536"); + + _config = new VectorIndexConfig(properties); + try (HnswVectorIndexCreator creator = new HnswVectorIndexCreator("foo", INDEX_DIR, _config)) { + float[] values1 = new float[] {5.0F, 42.0F, 54.33333F, 42.24F, 1001.045F}; + creator.add(values1); + float[] values2 = new float[] {42.0F, 23423.0F, 42431.32532F, 6785676.3242F, 42.3F}; + creator.add(values2); + float[] values3 = new float[] {1.0F, 2.0F, 3.0F, 4.0F, 5.0F}; + creator.add(values3); + float[] values4 = new float[] {42.678F, 23423423.0F, 42431.32523432F, 6723485.3242F, 42342.3F}; + creator.add(values4); + creator.seal(); + } + } + + @AfterMethod + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(INDEX_DIR); + } + + @Test + public void testIndexWriterReaderWithTop3() + throws IOException { + // Use VectorIndex reader to validate that reads work + try (HnswVectorIndexReader reader = new HnswVectorIndexReader("foo", INDEX_DIR, 4, _config)) { + int[] matchedDocIds = reader.getDocIds(new float[]{5.0F, 42.0F, 54.33333F, 42.24F, 3413.4F}, 3).toArray(); + // Expect to get 3 matching docIds since topK = 3 is used + Assert.assertEquals(matchedDocIds.length, 3); + Assert.assertEquals(matchedDocIds[0], 0); + Assert.assertEquals(matchedDocIds[1], 2); + Assert.assertEquals(matchedDocIds[1], 2); + } + } + + @Test + public void testIndexWriterReaderWithTop1() + throws IOException { + // Use VectorIndex reader to validate that reads work + try (HnswVectorIndexReader reader = new HnswVectorIndexReader("foo", INDEX_DIR, 4, _config)) { + int[] matchedDocIds = reader.getDocIds(new float[]{1.0F, 2.0F, 3.0F, 4.0F, 5.0F}, 1).toArray(); + // Expect to get 1 matching docId since topK = 1 is used + Assert.assertEquals(matchedDocIds.length, 1); + Assert.assertEquals(matchedDocIds[0], 2); + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java index 0e88cc73e9f..870983e97ab 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java @@ -25,15 +25,18 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule; import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator; import org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader; +import org.apache.pinot.segment.local.utils.fst.FSTBuilder; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; -import org.apache.pinot.spi.data.DimensionFieldSpec; -import org.apache.pinot.spi.data.FieldSpec; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.apache.pinot.segment.spi.V1Constants.Indexes.LUCENE_V912_FST_INDEX_FILE_EXTENSION; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; public class LuceneFSTIndexCreatorTest implements PinotBuffersAfterMethodCheckRule { @@ -59,9 +62,8 @@ public class LuceneFSTIndexCreatorTest implements PinotBuffersAfterMethodCheckRu uniqueValues[1] = "hello-world123"; uniqueValues[2] = "still"; - FieldSpec fieldSpec = new DimensionFieldSpec("testFSTColumn", FieldSpec.DataType.STRING, true); - LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator( - INDEX_DIR, "testFSTColumn", uniqueValues); + LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR, "testFSTColumn", "myTable_OFFLINE", + false, uniqueValues); creator.seal(); File fstFile = new File(INDEX_DIR, "testFSTColumn" + LUCENE_V912_FST_INDEX_FILE_EXTENSION); try (PinotDataBuffer pinotDataBuffer = @@ -69,12 +71,64 @@ public class LuceneFSTIndexCreatorTest implements PinotBuffersAfterMethodCheckRu LuceneFSTIndexReader reader = new LuceneFSTIndexReader(pinotDataBuffer)) { int[] matchedDictIds = reader.getDictIds("hello.*").toArray(); - Assert.assertEquals(2, matchedDictIds.length); - Assert.assertEquals(0, matchedDictIds[0]); - Assert.assertEquals(1, matchedDictIds[1]); + Assert.assertEquals(matchedDictIds.length, 2); + Assert.assertEquals(matchedDictIds[0], 0); + Assert.assertEquals(matchedDictIds[1], 1); matchedDictIds = reader.getDictIds(".*llo").toArray(); - Assert.assertEquals(0, matchedDictIds.length); + Assert.assertEquals(matchedDictIds.length, 0); + + matchedDictIds = reader.getDictIds("st.*").toArray(); + Assert.assertEquals(matchedDictIds.length, 1); + Assert.assertEquals(matchedDictIds[0], 2); + } + } + + @Test + public void testIndexWriterReaderWithAddExceptionsContinueOnErrorTrue() + throws IOException { + String[] uniqueValues = new String[3]; + uniqueValues[0] = "hello-world"; + uniqueValues[1] = "hello-world123"; + uniqueValues[2] = "still"; + + FSTBuilder fstBuilder = Mockito.spy(new FSTBuilder()); + // For the word "still" throw an exception so it is not indexed + doThrow(IOException.class).when(fstBuilder).addEntry(eq("still"), anyInt()); + LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR, "testFSTColumn", "myTable_OFFLINE", + true, uniqueValues, fstBuilder); + creator.seal(); + File fstFile = new File(INDEX_DIR, "testFSTColumn" + LUCENE_V912_FST_INDEX_FILE_EXTENSION); + try (PinotDataBuffer pinotDataBuffer = + PinotDataBuffer.mapFile(fstFile, true, 0, fstFile.length(), ByteOrder.BIG_ENDIAN, "fstIndexFile"); + LuceneFSTIndexReader reader = new LuceneFSTIndexReader(pinotDataBuffer)) { + + int[] matchedDictIds = reader.getDictIds("hello.*").toArray(); + Assert.assertEquals(matchedDictIds.length, 2); + Assert.assertEquals(matchedDictIds[0], 0); + Assert.assertEquals(matchedDictIds[1], 1); + + matchedDictIds = reader.getDictIds(".*llo").toArray(); + Assert.assertEquals(matchedDictIds.length, 0); + + // Validate that nothing matches st.* + matchedDictIds = reader.getDictIds("st.*").toArray(); + Assert.assertEquals(matchedDictIds.length, 0); } } + + @Test + public void testIndexWriterReaderWithAddExceptionsContinueOnErrorFalse() + throws IOException { + String[] uniqueValues = new String[3]; + uniqueValues[0] = "hello-world"; + uniqueValues[1] = "hello-world123"; + uniqueValues[2] = "still"; + + FSTBuilder fstBuilder = Mockito.spy(new FSTBuilder()); + // For the word "still" throw an exception so it is not indexed + doThrow(IOException.class).when(fstBuilder).addEntry(eq("still"), anyInt()); + Assert.assertThrows(IOException.class, () -> new LuceneFSTIndexCreator(INDEX_DIR, "testFSTColumn", + "myTable_OFFLINE", false, uniqueValues, fstBuilder)); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java new file mode 100644 index 00000000000..c8278188be6 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader; +import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class LuceneTextIndexCreatorTest { + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), LuceneTextIndexCreatorTest.class.toString()); + + @BeforeMethod + public void setUp() + throws IOException { + FileUtils.forceMkdir(INDEX_DIR); + + TextIndexConfig config = new TextIndexConfig(false, null, null, false, false, null, null, true, 500, null, null, + null, null, false, false, 0, false, null); + try (LuceneTextIndexCreator creator = new LuceneTextIndexCreator("foo", INDEX_DIR, true, false, null, null, + config)) { + creator.add("{\"clean\":\"this\"}"); + creator.add("{\"retain\":\"this\"}"); + creator.add("{\"keep\":\"this\"}"); + creator.add("{\"hold\":\"this\"}"); + creator.add("{\"clean\":\"that\"}"); + creator.seal(); + } + } + + @AfterMethod + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(INDEX_DIR); + } + + @Test + public void testIndexWriterReaderMatchClean() + throws IOException { + // Use TextIndex reader to validate that reads work + try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", INDEX_DIR, 5, new HashMap<>())) { + int[] matchedDocIds = reader.getDocIds("clean").toArray(); + Assert.assertEquals(matchedDocIds.length, 2); + Assert.assertEquals(matchedDocIds[0], 0); + Assert.assertEquals(matchedDocIds[1], 4); + } + } + + @Test + public void testIndexWriterReaderMatchHold() + throws IOException { + // Use TextIndex reader to validate that reads work + try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", INDEX_DIR, 5, new HashMap<>())) { + int[] matchedDocIds = reader.getDocIds("hold").toArray(); + Assert.assertEquals(matchedDocIds.length, 1); + Assert.assertEquals(matchedDocIds[0], 3); + } + } + + @Test + public void testIndexWriterReaderMatchRetain() + throws IOException { + // Use TextIndex reader to validate that reads work + try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", INDEX_DIR, 5, new HashMap<>())) { + int[] matchedDocIds = reader.getDocIds("retain").toArray(); + Assert.assertEquals(matchedDocIds.length, 1); + Assert.assertEquals(matchedDocIds[0], 1); + } + } + + @Test + public void testIndexWriterReaderMatchWithOrClause() + throws IOException { + // Use TextIndex reader to validate that reads work + try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", INDEX_DIR, 5, new HashMap<>())) { + int[] matchedDocIds = reader.getDocIds("retain|keep").toArray(); + Assert.assertEquals(matchedDocIds.length, 2); + Assert.assertEquals(matchedDocIds[0], 1); + Assert.assertEquals(matchedDocIds[1], 2); + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java index 34189413aa2..2729c42ef4a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java @@ -24,13 +24,15 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule; import org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator; import org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader; -import org.apache.pinot.spi.data.DimensionFieldSpec; -import org.apache.pinot.spi.data.FieldSpec; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.apache.pinot.segment.spi.V1Constants.Indexes.NATIVE_TEXT_INDEX_FILE_EXTENSION; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; @@ -58,8 +60,8 @@ public class NativeTextIndexCreatorTest implements PinotBuffersAfterMethodCheckR uniqueValues[2] = "still"; uniqueValues[3] = "zoobar"; - FieldSpec fieldSpec = new DimensionFieldSpec("testFSTColumn", FieldSpec.DataType.STRING, true); - try (NativeTextIndexCreator creator = new NativeTextIndexCreator("testFSTColumn", INDEX_DIR)) { + try (NativeTextIndexCreator creator = new NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE", false, + INDEX_DIR)) { for (int i = 0; i < 4; i++) { creator.add(uniqueValues[i]); } @@ -93,4 +95,81 @@ public class NativeTextIndexCreatorTest implements PinotBuffersAfterMethodCheckR } } } + + @Test + public void testIndexWriterReaderWithAddExceptionsWithContinueOnErrorTrue() + throws IOException { + String[] uniqueValues = new String[4]; + uniqueValues[0] = "still"; + uniqueValues[1] = "zoobar"; + uniqueValues[2] = "hello-world"; + uniqueValues[3] = "hello-world123"; + + try (NativeTextIndexCreator creator = spy(new NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE", + true, INDEX_DIR))) { + // Add a couple of words so they show up in the index + for (int i = 0; i < 2; i++) { + creator.add(uniqueValues[i]); + } + + // Throw exception for the remaining words + doThrow(RuntimeException.class).when(creator).analyze(anyString()); + for (int i = 2; i < 4; i++) { + creator.add(uniqueValues[i]); + } + + creator.seal(); + } + + File fstFile = new File(INDEX_DIR, "testFSTColumn" + NATIVE_TEXT_INDEX_FILE_EXTENSION); + try (NativeTextIndexReader reader = new NativeTextIndexReader("testFSTColumn", fstFile.getParentFile())) { + try { + int[] matchedDocIds = reader.getDocIds("hello.*").toArray(); + assertEquals(matchedDocIds.length, 0); + + matchedDocIds = reader.getDocIds(".*llo").toArray(); + assertEquals(matchedDocIds.length, 0); + + matchedDocIds = reader.getDocIds("wor.*").toArray(); + assertEquals(matchedDocIds.length, 0); + + matchedDocIds = reader.getDocIds("zoo.*").toArray(); + assertEquals(matchedDocIds.length, 1); + assertEquals(matchedDocIds[0], 1); + + matchedDocIds = reader.getDocIds(".*il.*").toArray(); + assertEquals(matchedDocIds.length, 1); + assertEquals(matchedDocIds[0], 0); + + matchedDocIds = reader.getDocIds(".*").toArray(); + assertEquals(matchedDocIds.length, 2); + assertEquals(matchedDocIds[0], 0); + assertEquals(matchedDocIds[1], 1); + } finally { + reader.closeInTest(); + } + } + } + + @Test + public void testIndexWriterReaderWithAddExceptionsWithContinueOnErrorFalse() + throws IOException { + String[] uniqueValues = new String[4]; + uniqueValues[0] = "still"; + uniqueValues[1] = "zoobar"; + uniqueValues[2] = "hello-world"; + uniqueValues[3] = "hello-world123"; + + try (NativeTextIndexCreator creator = spy(new NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE", + false, INDEX_DIR))) { + // Add a couple of words so they show up in the index + for (int i = 0; i < 2; i++) { + creator.add(uniqueValues[i]); + } + + // Throw exception for the remaining words + doThrow(RuntimeException.class).when(creator).analyze(anyString()); + Assert.assertThrows(RuntimeException.class, () -> creator.add(uniqueValues[2])); + } + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java index e1349741bfd..f41133629b5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java @@ -178,7 +178,7 @@ public class FilePerIndexDirectoryTest implements PinotBuffersAfterMethodCheckRu throws IOException { // See https://github.com/apache/pinot/issues/11529 try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); - NativeTextIndexCreator fooCreator = new NativeTextIndexCreator("foo", TEMP_DIR)) { + NativeTextIndexCreator fooCreator = new NativeTextIndexCreator("foo", "myTable_OFFLINE", false, TEMP_DIR)) { fooCreator.add("{\"clean\":\"this\"}"); fooCreator.seal(); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java index 218b4f3961e..f2174082342 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java @@ -40,7 +40,8 @@ public interface FSTIndexCreator extends IndexCreator { /** * Adds the next document. */ - void add(String document); + void add(String document) + throws IOException; /** * Adds a set of documents to the index diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java index 62be6acc478..7fef8e04827 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.segment.spi.index.IndexCreator; -import org.apache.pinot.spi.utils.JsonUtils; /** @@ -36,7 +35,7 @@ public interface JsonIndexCreator extends IndexCreator { default void add(Object value, int dictId) throws IOException { if (value instanceof Map) { - add(JsonUtils.objectToString(value)); + add((Map) value); } else { add((String) value); } @@ -52,6 +51,12 @@ public interface JsonIndexCreator extends IndexCreator { void add(String jsonString) throws IOException; + /** + * Adds the next json value for Map type + */ + void add(Map jsonMap) + throws IOException; + /** * Seals the index and flushes it to disk. */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java index ff2309968fa..83b7d351b89 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java @@ -79,7 +79,7 @@ public class JsonUtils { public static final String ARRAY_INDEX_KEY = ".$index"; public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$"; public static final int MAX_COMBINATIONS = 100_000; - private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD = + public static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD = Collections.singletonList(Collections.singletonMap(VALUE_KEY, SKIPPED_VALUE_REPLACEMENT)); // For querying @@ -736,14 +736,14 @@ public class JsonUtils { JsonNode jsonNode; try { jsonNode = JsonUtils.stringToJsonNode(jsonString); - } catch (JsonProcessingException e) { + return JsonUtils.flatten(jsonNode, jsonIndexConfig); + } catch (Exception e) { if (jsonIndexConfig.getSkipInvalidJson()) { return SKIPPED_FLATTENED_RECORD; } else { throw e; } } - return JsonUtils.flatten(jsonNode, jsonIndexConfig); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org