This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dd84b52d52 Add bad data handling for some IndexCreator::add() 
functions to skip record or add dummy record (#16094)
6dd84b52d52 is described below

commit 6dd84b52d528526a45f783aa34414be5a4dee71f
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Mon Jul 14 17:23:44 2025 -0700

    Add bad data handling for some IndexCreator::add() functions to skip record 
or add dummy record (#16094)
    
    * Add bad data handling for some IndexCreator::add() functions to skip 
record or add dummy record
    
    * Add tableNameWithType to update the metric with table name included
    
    * Address review comments
    
    * Use continueOnError flag for FST and native text index creators
    
    * Compared returned flattened record to default 
JsonUtils.SKIPPED_FLATTENED_RECORD and update metric if they are equal
    
    * Use _continueOnError even for JSON index
    
    * Update H3 index creator to use continueOnError flag
    
    * Address review comments
---
 .../accounting/ResourceManagerAccountingTest.java  |   3 +-
 .../impl/inv/geospatial/BaseH3IndexCreator.java    |  19 ++--
 .../impl/inv/geospatial/OffHeapH3IndexCreator.java |   5 +-
 .../impl/inv/geospatial/OnHeapH3IndexCreator.java  |   5 +-
 .../impl/inv/json/BaseJsonIndexCreator.java        |  51 +++++++++-
 .../impl/inv/json/OffHeapJsonIndexCreator.java     |   5 +-
 .../impl/inv/json/OnHeapJsonIndexCreator.java      |   5 +-
 .../impl/inv/text/LuceneFSTIndexCreator.java       |  55 +++++++++--
 .../creator/impl/text/NativeTextIndexCreator.java  |  39 +++++++-
 .../local/segment/index/h3/H3IndexType.java        |   4 +-
 .../local/segment/index/json/JsonIndexType.java    |   5 +-
 .../local/segment/index/text/TextIndexType.java    |   3 +-
 .../pinot/segment/local/utils/MetricUtils.java     |  37 +++----
 .../segment/local/segment/index/H3IndexTest.java   |  64 +++++++++++-
 .../segment/local/segment/index/JsonIndexTest.java |  89 +++++++++++++++--
 .../index/creator/HnswVectorIndexCreatorTest.java  |  95 ++++++++++++++++++
 .../index/creator/LuceneFSTIndexCreatorTest.java   |  72 ++++++++++++--
 .../index/creator/LuceneTextIndexCreatorTest.java  | 107 +++++++++++++++++++++
 .../index/creator/NativeTextIndexCreatorTest.java  |  87 ++++++++++++++++-
 .../segment/store/FilePerIndexDirectoryTest.java   |   2 +-
 .../segment/spi/index/creator/FSTIndexCreator.java |   3 +-
 .../spi/index/creator/JsonIndexCreator.java        |   9 +-
 .../java/org/apache/pinot/spi/utils/JsonUtils.java |   6 +-
 23 files changed, 680 insertions(+), 90 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index b72cc621a97..38ab43ba220 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -520,7 +520,8 @@ public class ResourceManagerAccountingTest {
     File indexDir = new File(FileUtils.getTempDirectory(), 
"testJsonIndexExtractMapOOM");
     FileUtils.forceMkdir(indexDir);
     String colName = "col";
-    try (JsonIndexCreator offHeapIndexCreator = new 
OffHeapJsonIndexCreator(indexDir, colName, new JsonIndexConfig());
+    try (JsonIndexCreator offHeapIndexCreator = new 
OffHeapJsonIndexCreator(indexDir, colName, "myTable_OFFLINE",
+        false, new JsonIndexConfig());
         MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(new 
JsonIndexConfig(), "table__0__1", "col")) {
       // build json indexes
       for (int i = 0; i < 1000000; i++) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
index b3861073619..888d5cbb004 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/BaseH3IndexCreator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial;
 
+import com.google.common.base.Preconditions;
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -26,16 +27,14 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.segment.index.h3.H3IndexType;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.segment.local.utils.H3Utils;
+import org.apache.pinot.segment.local.utils.MetricUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
@@ -72,6 +71,7 @@ public abstract class BaseH3IndexCreator implements 
GeoSpatialIndexCreator {
   static final String BITMAP_VALUE_FILE_NAME = "bitmap.value.buf";
 
   final String _tableNameWithType;
+  final boolean _continueOnError;
   final File _indexFile;
   final File _tempDir;
   final File _dictionaryFile;
@@ -88,9 +88,11 @@ public abstract class BaseH3IndexCreator implements 
GeoSpatialIndexCreator {
 
   int _nextDocId;
 
-  BaseH3IndexCreator(File indexDir, String columnName, String 
tableNameWithType, H3IndexResolution resolution)
+  BaseH3IndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      H3IndexResolution resolution)
       throws IOException {
     _tableNameWithType = tableNameWithType;
+    _continueOnError = continueOnError;
     _indexFile = new File(indexDir, columnName + 
V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
     _tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX);
     if (_tempDir.exists()) {
@@ -116,13 +118,14 @@ public abstract class BaseH3IndexCreator implements 
GeoSpatialIndexCreator {
   @Override
   public void add(@Nullable Geometry geometry)
       throws IOException {
-    if (geometry == null || !(geometry instanceof Point)) {
-      String metricKeyName =
-          _tableNameWithType + "-" + 
H3IndexType.INDEX_DISPLAY_NAME.toUpperCase(Locale.US) + "-indexingError";
-      ServerMetrics.get().addMeteredTableValue(metricKeyName, 
ServerMeter.INDEXING_FAILURES, 1);
+    if (_continueOnError && !(geometry instanceof Point)) {
+      MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
H3IndexType.INDEX_DISPLAY_NAME);
       _nextDocId++;
       return;
     }
+    Preconditions.checkState(geometry != null, "Null geometry record found and 
continueOnError is disabled");
+    Preconditions.checkState(geometry instanceof Point, "H3 index can only be 
applied to Point, got: %s",
+        geometry.getGeometryType());
     Coordinate coordinate = geometry.getCoordinate();
     // TODO: support multiple resolutions
     long h3Id = H3Utils.H3_CORE.latLngToCell(coordinate.y, coordinate.x, 
_lowestResolution);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
index 35240582ca0..72fb3ddf066 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OffHeapH3IndexCreator.java
@@ -61,9 +61,10 @@ public class OffHeapH3IndexCreator extends 
BaseH3IndexCreator {
 
   private long _postingListChunkOffset;
 
-  public OffHeapH3IndexCreator(File indexDir, String columnName, String 
tableNameWithType, H3IndexResolution resolution)
+  public OffHeapH3IndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      H3IndexResolution resolution)
       throws IOException {
-    super(indexDir, columnName, tableNameWithType, resolution);
+    super(indexDir, columnName, tableNameWithType, continueOnError, 
resolution);
     _postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME);
     _postingListOutputStream = new DataOutputStream(new 
BufferedOutputStream(new FileOutputStream(_postingListFile)));
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
index c9583a45d75..c02b83e1a17 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/geospatial/OnHeapH3IndexCreator.java
@@ -34,9 +34,10 @@ import org.roaringbitmap.RoaringBitmapWriter;
  */
 public class OnHeapH3IndexCreator extends BaseH3IndexCreator {
 
-  public OnHeapH3IndexCreator(File indexDir, String columnName, String 
tableNameWithType, H3IndexResolution resolution)
+  public OnHeapH3IndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      H3IndexResolution resolution)
       throws IOException {
-    super(indexDir, columnName, tableNameWithType, resolution);
+    super(indexDir, columnName, tableNameWithType, continueOnError, 
resolution);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
index 5c3409fe45f..91479775b9f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.inv.json;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntList;
@@ -31,6 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.index.json.JsonIndexType;
+import org.apache.pinot.segment.local.utils.MetricUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.memory.CleanerUtil;
@@ -62,6 +65,8 @@ public abstract class BaseJsonIndexCreator implements 
JsonIndexCreator {
   static final String DICTIONARY_FILE_NAME = "dictionary.buf";
   static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf";
 
+  final String _tableNameWithType;
+  final boolean _continueOnError;
   final JsonIndexConfig _jsonIndexConfig;
   final File _indexFile;
   final File _tempDir;
@@ -74,8 +79,11 @@ public abstract class BaseJsonIndexCreator implements 
JsonIndexCreator {
   int _nextFlattenedDocId;
   int _maxValueLength;
 
-  BaseJsonIndexCreator(File indexDir, String columnName, JsonIndexConfig 
jsonIndexConfig)
+  BaseJsonIndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      JsonIndexConfig jsonIndexConfig)
       throws IOException {
+    _tableNameWithType = tableNameWithType;
+    _continueOnError = continueOnError;
     _jsonIndexConfig = jsonIndexConfig;
     _indexFile = new File(indexDir, columnName + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
     _tempDir = new File(indexDir, columnName + TEMP_DIR_SUFFIX);
@@ -91,7 +99,46 @@ public abstract class BaseJsonIndexCreator implements 
JsonIndexCreator {
   @Override
   public void add(String jsonString)
       throws IOException {
-    addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig));
+    List<Map<String, String>> flattenedRecord;
+    try {
+      flattenedRecord = JsonUtils.flatten(jsonString, _jsonIndexConfig);
+      if (flattenedRecord == JsonUtils.SKIPPED_FLATTENED_RECORD) {
+        // The default SKIPPED_FLATTENED_RECORD was returned, this can only 
happen if the original record could not be
+        // flattened, update the metric
+        MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
JsonIndexType.INDEX_DISPLAY_NAME);
+      }
+    } catch (Exception e) {
+      if (_continueOnError) {
+        // Caught exception while trying to add, update metric and add a 
default SKIPPED_FLATTENED_RECORD
+        // This check is needed in the case where 
`_jsonIndexConfig.getSkipInvalidJson()` is false,
+        // but _continueOnError is true
+        MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
JsonIndexType.INDEX_DISPLAY_NAME);
+        flattenedRecord = JsonUtils.SKIPPED_FLATTENED_RECORD;
+      } else {
+        throw e;
+      }
+    }
+    addFlattenedRecords(flattenedRecord);
+  }
+
+  @Override
+  public void add(Map value)
+      throws IOException {
+    String valueToAdd;
+    try {
+      // TODO: Avoid this ser/de from map -> string -> json node
+      valueToAdd = JsonUtils.objectToString(value);
+    } catch (JsonProcessingException e) {
+      if (_jsonIndexConfig.getSkipInvalidJson() || _continueOnError) {
+        // Caught exception while trying to add, update metric and add a 
default SKIPPED_FLATTENED_RECORD
+        MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
JsonIndexType.INDEX_DISPLAY_NAME);
+        addFlattenedRecords(JsonUtils.SKIPPED_FLATTENED_RECORD);
+        return;
+      } else {
+        throw e;
+      }
+    }
+    add(valueToAdd);
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
index 30c5d2b63b3..7b99072124d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OffHeapJsonIndexCreator.java
@@ -70,9 +70,10 @@ public class OffHeapJsonIndexCreator extends 
BaseJsonIndexCreator {
   private int _numPostingListsInLastChunk;
   private int _numPostingLists;
 
-  public OffHeapJsonIndexCreator(File indexDir, String columnName, 
JsonIndexConfig jsonIndexConfig)
+  public OffHeapJsonIndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      JsonIndexConfig jsonIndexConfig)
       throws IOException {
-    super(indexDir, columnName, jsonIndexConfig);
+    super(indexDir, columnName, tableNameWithType, continueOnError, 
jsonIndexConfig);
     _postingListFile = new File(_tempDir, POSTING_LIST_FILE_NAME);
     _postingListOutputStream = new DataOutputStream(new 
BufferedOutputStream(new FileOutputStream(_postingListFile)));
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
index bb45728c3f4..f09b8b717c5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/OnHeapJsonIndexCreator.java
@@ -39,9 +39,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  */
 public class OnHeapJsonIndexCreator extends BaseJsonIndexCreator {
 
-  public OnHeapJsonIndexCreator(File indexDir, String columnName, 
JsonIndexConfig jsonIndexConfig)
+  public OnHeapJsonIndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      JsonIndexConfig jsonIndexConfig)
       throws IOException {
-    super(indexDir, columnName, jsonIndexConfig);
+    super(indexDir, columnName, tableNameWithType, continueOnError, 
jsonIndexConfig);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
index c490a79db10..c410003d1b0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.inv.text;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import org.apache.lucene.store.OutputStreamDataOutput;
 import org.apache.lucene.util.fst.FST;
+import org.apache.pinot.segment.local.segment.index.fst.FstIndexType;
+import org.apache.pinot.segment.local.utils.MetricUtils;
 import org.apache.pinot.segment.local.utils.fst.FSTBuilder;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
@@ -40,6 +43,9 @@ import org.slf4j.LoggerFactory;
 public class LuceneFSTIndexCreator implements FSTIndexCreator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LuceneFSTIndexCreator.class);
   private final File _fstIndexFile;
+  private final String _columnName;
+  private final String _tableNameWithType;
+  private final boolean _continueOnError;
   private final FSTBuilder _fstBuilder;
   Integer _dictId;
 
@@ -50,36 +56,69 @@ public class LuceneFSTIndexCreator implements 
FSTIndexCreator {
    *
    * @param indexDir  Index directory
    * @param columnName Column name for which index is being created
+   * @param tableNameWithType table name with type
+   * @param continueOnError if true, don't throw exception on add() failures
    * @param sortedEntries Sorted entries of the unique values of the column.
    * @throws IOException
    */
-  public LuceneFSTIndexCreator(File indexDir, String columnName, String[] 
sortedEntries)
+  public LuceneFSTIndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      String[] sortedEntries)
       throws IOException {
+    this(indexDir, columnName, tableNameWithType, continueOnError, 
sortedEntries, new FSTBuilder());
+  }
+
+  @VisibleForTesting
+  public LuceneFSTIndexCreator(File indexDir, String columnName, String 
tableNameWithType, boolean continueOnError,
+      String[] sortedEntries, FSTBuilder fstBuilder)
+      throws IOException {
+    _columnName = columnName;
+    _tableNameWithType = tableNameWithType;
+    _continueOnError = continueOnError;
     _fstIndexFile = new File(indexDir, columnName + 
V1Constants.Indexes.LUCENE_V912_FST_INDEX_FILE_EXTENSION);
 
-    _fstBuilder = new FSTBuilder();
+    _fstBuilder = fstBuilder;
     _dictId = 0;
     if (sortedEntries != null) {
       for (_dictId = 0; _dictId < sortedEntries.length; _dictId++) {
-        _fstBuilder.addEntry(sortedEntries[_dictId], _dictId);
+        try {
+          _fstBuilder.addEntry(sortedEntries[_dictId], _dictId);
+        } catch (Exception ex) {
+          if (_continueOnError) {
+            // Caught exception while trying to add, update metric and skip 
the document
+            MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
FstIndexType.INDEX_DISPLAY_NAME);
+          } else {
+            LOGGER.error("Caught exception while trying to add to FST index 
for table: {}, column: {}",
+                tableNameWithType, columnName, ex);
+            throw ex;
+          }
+        }
       }
     }
   }
 
   public LuceneFSTIndexCreator(IndexCreationContext context)
       throws IOException {
-    this(context.getIndexDir(), context.getFieldSpec().getName(), (String[]) 
context.getSortedUniqueElementsArray());
+    this(context.getIndexDir(), context.getFieldSpec().getName(), 
context.getTableNameWithType(),
+        context.isContinueOnError(), (String[]) 
context.getSortedUniqueElementsArray());
   }
 
   // Expects dictionary entries in sorted order.
   @Override
-  public void add(String document) {
+  public void add(String document)
+      throws IOException {
     try {
       _fstBuilder.addEntry(document, _dictId);
-      _dictId++;
-    } catch (IOException ex) {
-      throw new RuntimeException("Unable to load the schema file", ex);
+    } catch (Exception ex) {
+      if (_continueOnError) {
+        // Caught exception while trying to add, update metric and skip the 
document
+        MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
FstIndexType.INDEX_DISPLAY_NAME);
+      } else {
+        LOGGER.error("Caught exception while trying to add to FST index for 
table: {}, column: {}",
+            _tableNameWithType, _columnName, ex);
+        throw ex;
+      }
     }
+    _dictId++;
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
index 7ef4d252143..dab54fb6408 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/text/NativeTextIndexCreator.java
@@ -35,6 +35,8 @@ import 
org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
 import 
org.apache.pinot.segment.local.segment.index.text.AbstractTextIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.text.CaseAwareStandardAnalyzer;
+import org.apache.pinot.segment.local.segment.index.text.TextIndexType;
+import org.apache.pinot.segment.local.utils.MetricUtils;
 import org.apache.pinot.segment.local.utils.nativefst.FST;
 import org.apache.pinot.segment.local.utils.nativefst.FSTHeader;
 import org.apache.pinot.segment.local.utils.nativefst.builder.FSTBuilder;
@@ -42,11 +44,14 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.roaringbitmap.Container;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.RoaringBitmapWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 public class NativeTextIndexCreator extends AbstractTextIndexCreator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NativeTextIndexCreator.class);
   private static final String TEMP_DIR_SUFFIX = ".nativetext.idx.tmp";
   private static final String FST_FILE_NAME = "native.fst";
   private static final String INVERTED_INDEX_FILE_NAME = "inverted.index.buf";
@@ -62,6 +67,8 @@ public class NativeTextIndexCreator extends 
AbstractTextIndexCreator {
   public static final int VERSION = 1;
 
   private final String _columnName;
+  private final String _tableNameWithType;
+  private final boolean _continueOnError;
   private final FSTBuilder _fstBuilder;
   private final File _indexFile;
   private final File _tempDir;
@@ -74,9 +81,11 @@ public class NativeTextIndexCreator extends 
AbstractTextIndexCreator {
   private int _fstDataSize;
   private int _numBitMaps;
 
-  public NativeTextIndexCreator(String column, File indexDir)
+  public NativeTextIndexCreator(String column, String tableNameWithType, 
boolean continueOnError, File indexDir)
       throws IOException {
     _columnName = column;
+    _tableNameWithType = tableNameWithType;
+    _continueOnError = continueOnError;
     _fstBuilder = new FSTBuilder();
     _indexFile = new File(indexDir, column + 
V1Constants.Indexes.NATIVE_TEXT_INDEX_FILE_EXTENSION);
     _tempDir = new File(indexDir, column + TEMP_DIR_SUFFIX);
@@ -92,14 +101,36 @@ public class NativeTextIndexCreator extends 
AbstractTextIndexCreator {
 
   @Override
   public void add(String document) {
-    addHelper(document);
+    try {
+      addHelper(document);
+    } catch (RuntimeException e) {
+      if (_continueOnError) {
+        // Caught exception while trying to add, update metric and skip the 
document
+        MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
TextIndexType.INDEX_DISPLAY_NAME);
+      } else {
+        LOGGER.error("Caught exception while trying to add to native text 
index for table: {}, column: {}",
+            _tableNameWithType, _columnName, e);
+        throw e;
+      }
+    }
     _nextDocId++;
   }
 
   @Override
   public void add(String[] documents, int length) {
-    for (int i = 0; i < length; i++) {
-      addHelper(documents[i]);
+    try {
+      for (int i = 0; i < length; i++) {
+        addHelper(documents[i]);
+      }
+    } catch (RuntimeException e) {
+      if (_continueOnError) {
+        // Caught exception while trying to add, update metric and skip the 
document
+        MetricUtils.updateIndexingErrorMetric(_tableNameWithType, 
TextIndexType.INDEX_DISPLAY_NAME);
+      } else {
+        LOGGER.error("Caught exception while trying to add to native text 
index for table: {}, column: {}",
+            _tableNameWithType, _columnName, e);
+        throw e;
+      }
     }
     _nextDocId++;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
index 5d0996ef746..194ae655f58 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java
@@ -109,9 +109,9 @@ public class H3IndexType extends 
AbstractIndexType<H3IndexConfig, H3IndexReader,
     H3IndexResolution resolution = 
Objects.requireNonNull(indexConfig).getResolution();
     return context.isOnHeap()
         ? new OnHeapH3IndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(),
-        context.getTableNameWithType(), resolution)
+        context.getTableNameWithType(), context.isContinueOnError(), 
resolution)
         : new OffHeapH3IndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(),
-            context.getTableNameWithType(), resolution);
+            context.getTableNameWithType(), context.isContinueOnError(), 
resolution);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
index 4943e7d0c3f..17ded67a0e0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
@@ -115,8 +115,9 @@ public class JsonIndexType extends 
AbstractIndexType<JsonIndexConfig, JsonIndexR
     Preconditions.checkState(storedType == DataType.STRING || storedType == 
DataType.MAP,
         "Json index is currently only supported on STRING columns");
     return context.isOnHeap() ? new 
OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-        indexConfig)
-        : new OffHeapJsonIndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(), indexConfig);
+        context.getTableNameWithType(), context.isContinueOnError(), 
indexConfig)
+        : new OffHeapJsonIndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(),
+            context.getTableNameWithType(), context.isContinueOnError(), 
indexConfig);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
index dfe8616bd05..3c36ae2ac63 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java
@@ -116,7 +116,8 @@ public class TextIndexType extends 
AbstractIndexType<TextIndexConfig, TextIndexR
     
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() 
== FieldSpec.DataType.STRING,
         "Text index is currently only supported on STRING type columns");
     if (indexConfig.getFstType() == FSTType.NATIVE) {
-      return new NativeTextIndexCreator(context.getFieldSpec().getName(), 
context.getIndexDir());
+      return new NativeTextIndexCreator(context.getFieldSpec().getName(), 
context.getTableNameWithType(),
+          context.isContinueOnError(), context.getIndexDir());
     } else {
       return new LuceneTextIndexCreator(context, indexConfig);
     }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java
similarity index 54%
copy from 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java
index 218b4f3961e..29695833b86 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/MetricUtils.java
@@ -16,34 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.index.creator;
+package org.apache.pinot.segment.local.utils;
 
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import java.util.Locale;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 
 
-public interface FSTIndexCreator extends IndexCreator {
+/**
+ * Utils for metrics
+ */
+public class MetricUtils {
 
-  @Override
-  default void add(Object value, int dictId)
-      throws IOException {
-    // FST indexes should do nothing when called for each row
-  }
 
-  @Override
-  default void add(Object[] values, @Nullable int[] dictIds)
-      throws IOException {
-    // FST indexes should do nothing when called for each row
+  private MetricUtils() {
   }
 
-  /**
-   * Adds the next document.
-   */
-  void add(String document);
-
-  /**
-   * Adds a set of documents to the index
-   */
-  void add(String[] document, int length);
+  public static void updateIndexingErrorMetric(String tableNameWithType, 
String indexDisplayName) {
+    String metricKeyName =
+        tableNameWithType + "-" + indexDisplayName.toUpperCase(Locale.US) + 
"-indexingError";
+    ServerMetrics.get().addMeteredTableValue(metricKeyName, 
ServerMeter.INDEXING_FAILURES, 1);
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
index 7493d43db6f..7e1bee3bf57 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/H3IndexTest.java
@@ -92,9 +92,9 @@ public class H3IndexTest implements 
PinotBuffersAfterMethodCheckRule {
 
     try (MutableH3Index mutableH3Index = new 
MutableH3Index(h3IndexResolution)) {
       try (GeoSpatialIndexCreator onHeapCreator = new 
OnHeapH3IndexCreator(TEMP_DIR, onHeapColumnName,
-          "myTable_OFFLINE", h3IndexResolution);
+          "myTable_OFFLINE", false, h3IndexResolution);
           GeoSpatialIndexCreator offHeapCreator = new 
OffHeapH3IndexCreator(TEMP_DIR, offHeapColumnName,
-              "myTable_OFFLINE", h3IndexResolution)) {
+              "myTable_OFFLINE", false, h3IndexResolution)) {
         int docId = 0;
         while (expectedCardinalities.size() < numUniqueH3Ids) {
           double longitude = RANDOM.nextDouble() * 360 - 180;
@@ -134,7 +134,7 @@ public class H3IndexTest implements 
PinotBuffersAfterMethodCheckRule {
     int res = 5;
     H3IndexResolution resolution = new 
H3IndexResolution(Collections.singletonList(res));
 
-    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE",
+    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE", true,
         resolution)) {
       Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(10, 20));
       creator.add(point);
@@ -160,7 +160,7 @@ public class H3IndexTest implements 
PinotBuffersAfterMethodCheckRule {
     int res = 5;
     H3IndexResolution resolution = new 
H3IndexResolution(Collections.singletonList(res));
 
-    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE",
+    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE", true,
         resolution)) {
       Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(10, 20));
       creator.add(point);
@@ -186,7 +186,7 @@ public class H3IndexTest implements 
PinotBuffersAfterMethodCheckRule {
     int res = 5;
     H3IndexResolution resolution = new 
H3IndexResolution(Collections.singletonList(res));
 
-    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE",
+    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE", true,
         resolution)) {
       Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(10, 42));
       creator.add(point);
@@ -208,6 +208,60 @@ public class H3IndexTest implements 
PinotBuffersAfterMethodCheckRule {
     }
   }
 
+  @Test
+  public void testSkipInvalidGeometryContinueOnErrorFalse()
+      throws Exception {
+    String columnName = "skipInvalid";
+    int res = 5;
+    H3IndexResolution resolution = new 
H3IndexResolution(Collections.singletonList(res));
+
+    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE", false,
+        resolution)) {
+      Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(10, 20));
+      creator.add(point);
+
+      // Invalid serialized bytes should be skipped without throwing exception
+      Assert.assertThrows(IllegalStateException.class, () -> creator.add(new 
byte[]{1, 2, 3}, -1));
+    }
+  }
+
+  @Test
+  public void testSkipNullGeometryContinueOnErrorFalse()
+      throws Exception {
+    String columnName = "skipNull";
+    int res = 5;
+    H3IndexResolution resolution = new 
H3IndexResolution(Collections.singletonList(res));
+
+    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE", false,
+        resolution)) {
+      Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(10, 20));
+      creator.add(point);
+
+      // Explicit null geometry should also be skipped
+      Assert.assertThrows(IllegalStateException.class, () -> 
creator.add(null));
+    }
+  }
+
+  @Test
+  public void testSkipNonPointGeometryContinueOnErrorFalse()
+      throws Exception {
+    String columnName = "skipInvalidGeometryType";
+    int res = 5;
+    H3IndexResolution resolution = new 
H3IndexResolution(Collections.singletonList(res));
+
+    try (GeoSpatialIndexCreator creator = new OnHeapH3IndexCreator(TEMP_DIR, 
columnName, "myTable_OFFLINE", false,
+        resolution)) {
+      Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(10, 42));
+      creator.add(point);
+
+      // Explicit non-point geometry should also be skipped
+      Point[] points = new Point[1];
+      points[0] = point;
+      MultiPoint multiPoint = 
GeometryUtils.GEOMETRY_FACTORY.createMultiPoint(points);
+      Assert.assertThrows(IllegalStateException.class, () -> 
creator.add(multiPoint));
+    }
+  }
+
   public static class ConfTest extends AbstractSerdeIndexContract {
 
     protected void assertEquals(H3IndexConfig expected) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 8f9b477ae20..b576a443942 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -397,9 +397,24 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
    */
   private void createIndex(boolean createOnHeap, JsonIndexConfig 
jsonIndexConfig, String[] records)
       throws IOException {
+    createIndex(createOnHeap, jsonIndexConfig, records, false);
+  }
+
+  /**
+   * Creates a JSON index with the given config and adds the given records
+   * @param createOnHeap Whether to create an on-heap index
+   * @param jsonIndexConfig the JSON index config
+   * @param records the records to be added to the index
+   * @param continueOnError whether continueOnError should be enabled or 
disabled
+   * @throws IOException on error
+   */
+  private void createIndex(boolean createOnHeap, JsonIndexConfig 
jsonIndexConfig, String[] records, boolean continueOnError)
+      throws IOException {
     try (JsonIndexCreator indexCreator = createOnHeap
-        ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME, 
jsonIndexConfig)
-        : new OffHeapJsonIndexCreator(INDEX_DIR, OFF_HEAP_COLUMN_NAME, 
jsonIndexConfig)) {
+        ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME, 
"myTable_OFFLINE", continueOnError,
+        jsonIndexConfig)
+        : new OffHeapJsonIndexCreator(INDEX_DIR, OFF_HEAP_COLUMN_NAME, 
"myTable_OFFLINE", continueOnError,
+            jsonIndexConfig)) {
       for (String record : records) {
         indexCreator.add(record);
       }
@@ -453,7 +468,8 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     };
 
     String colName = "col";
-    try (JsonIndexCreator offHeapCreator = new 
OffHeapJsonIndexCreator(INDEX_DIR, colName, getIndexConfig());
+    try (JsonIndexCreator offHeapCreator = new 
OffHeapJsonIndexCreator(INDEX_DIR, colName, "myTable_OFFLINE",
+        false, getIndexConfig());
         MutableJsonIndexImpl mutableIndex = new 
MutableJsonIndexImpl(getIndexConfig(), "table__0__1", "col")) {
       for (String record : records) {
         offHeapCreator.add(record);
@@ -522,7 +538,8 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     // @formatter: on
 
     String colName = "col";
-    try (JsonIndexCreator offHeapCreator = new 
OffHeapJsonIndexCreator(INDEX_DIR, colName, getIndexConfig());
+    try (JsonIndexCreator offHeapCreator = new 
OffHeapJsonIndexCreator(INDEX_DIR, colName, "myTable_OFFLINE",
+        false, getIndexConfig());
         MutableJsonIndexImpl mutableIndex = new 
MutableJsonIndexImpl(getIndexConfig(), "table__0__1", "col")) {
       for (String record : records) {
         offHeapCreator.add(record);
@@ -817,7 +834,7 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
   }
 
   @Test
-  public void testSkipInvalidJsonEnable() throws Exception {
+  public void testSkipInvalidJsonEnableContinueOnErrorFalse() throws Exception 
{
     JsonIndexConfig jsonIndexConfig = getIndexConfig();
     jsonIndexConfig.setSkipInvalidJson(true);
     // the braces don't match and cannot be parsed
@@ -849,8 +866,68 @@ public class JsonIndexTest implements 
PinotBuffersAfterMethodCheckRule {
     }
   }
 
+  @Test
+  public void testSkipInvalidJsonEnableContinueOnErrorTrue() throws Exception {
+    JsonIndexConfig jsonIndexConfig = getIndexConfig();
+    jsonIndexConfig.setSkipInvalidJson(true);
+    // the braces don't match and cannot be parsed
+    String[] records = {"{\"key1\":\"va\""};
+
+    createIndex(true, jsonIndexConfig, records, true);
+    File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(onHeapIndexFile.exists());
+
+    createIndex(false, jsonIndexConfig, records, true);
+    File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(offHeapIndexFile.exists());
+
+    try (PinotDataBuffer onHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+        PinotDataBuffer offHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+        JsonIndexReader onHeapReader = new 
ImmutableJsonIndexReader(onHeapBuffer, records.length);
+        JsonIndexReader offHeapReader = new 
ImmutableJsonIndexReader(offHeapBuffer, records.length);
+        MutableJsonIndexImpl mutableJsonIndex = new 
MutableJsonIndexImpl(jsonIndexConfig, "table__0__1", "col")) {
+      for (String record : records) {
+        mutableJsonIndex.add(record);
+      }
+      Map<String, RoaringBitmap> onHeapRes = getMatchingDocsMap(onHeapReader, 
"$");
+      Map<String, RoaringBitmap> offHeapRes = 
getMatchingDocsMap(offHeapReader, "$");
+      Map<String, RoaringBitmap> mutableRes = 
mutableJsonIndex.getMatchingFlattenedDocsMap("$", null);
+      Object expectedRes = 
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT, 
RoaringBitmap.bitmapOf(0));
+      assertEquals(onHeapRes, expectedRes);
+      assertEquals(offHeapRes, expectedRes);
+      assertEquals(mutableRes, expectedRes);
+    }
+  }
+
+  @Test
+  public void testSkipInvalidJsonDisabledContinueOnErrorTrue() throws 
Exception {
+    // by default, skipInvalidJson is disabled
+    JsonIndexConfig jsonIndexConfig = getIndexConfig();
+    // the braces don't match and cannot be parsed
+    String[] records = {"{\"key1\":\"va\""};
+
+    createIndex(true, jsonIndexConfig, records, true);
+    File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(onHeapIndexFile.exists());
+
+    createIndex(false, jsonIndexConfig, records, true);
+    File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + 
V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    assertTrue(offHeapIndexFile.exists());
+
+    try (PinotDataBuffer onHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+        PinotDataBuffer offHeapBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+        JsonIndexReader onHeapReader = new 
ImmutableJsonIndexReader(onHeapBuffer, records.length);
+        JsonIndexReader offHeapReader = new 
ImmutableJsonIndexReader(offHeapBuffer, records.length)) {
+      Map<String, RoaringBitmap> onHeapRes = getMatchingDocsMap(onHeapReader, 
"$");
+      Map<String, RoaringBitmap> offHeapRes = 
getMatchingDocsMap(offHeapReader, "$");
+      Object expectedRes = 
Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT, 
RoaringBitmap.bitmapOf(0));
+      assertEquals(onHeapRes, expectedRes);
+      assertEquals(offHeapRes, expectedRes);
+    }
+  }
+
   @Test(expectedExceptions = JsonProcessingException.class)
-  public void testSkipInvalidJsonDisabled() throws Exception {
+  public void testSkipInvalidJsonDisabledContinueOnErrorFalse() throws 
Exception {
     // by default, skipInvalidJson is disabled
     JsonIndexConfig jsonIndexConfig = getIndexConfig();
     // the braces don't match and cannot be parsed
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java
new file mode 100644
index 00000000000..e1ed2625716
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/HnswVectorIndexCreatorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.vector.HnswVectorIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.readers.vector.HnswVectorIndexReader;
+import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class HnswVectorIndexCreatorTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
HnswVectorIndexCreatorTest.class.toString());
+  private VectorIndexConfig _config;
+
+  @BeforeMethod
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(INDEX_DIR);
+
+    Map<String, String> properties = new HashMap<>();
+
+    properties.put("vectorIndexType", "HNSW");
+    properties.put("vectorDimension", "1536");
+
+    _config = new VectorIndexConfig(properties);
+    try (HnswVectorIndexCreator creator = new HnswVectorIndexCreator("foo", 
INDEX_DIR, _config)) {
+      float[] values1 = new float[] {5.0F, 42.0F, 54.33333F, 42.24F, 
1001.045F};
+      creator.add(values1);
+      float[] values2 = new float[] {42.0F, 23423.0F, 42431.32532F, 
6785676.3242F, 42.3F};
+      creator.add(values2);
+      float[] values3 = new float[] {1.0F, 2.0F, 3.0F, 4.0F, 5.0F};
+      creator.add(values3);
+      float[] values4 = new float[] {42.678F, 23423423.0F, 42431.32523432F, 
6723485.3242F, 42342.3F};
+      creator.add(values4);
+      creator.seal();
+    }
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @Test
+  public void testIndexWriterReaderWithTop3()
+      throws IOException {
+    // Use VectorIndex reader to validate that reads work
+    try (HnswVectorIndexReader reader = new HnswVectorIndexReader("foo", 
INDEX_DIR, 4, _config)) {
+      int[] matchedDocIds = reader.getDocIds(new float[]{5.0F, 42.0F, 
54.33333F, 42.24F, 3413.4F}, 3).toArray();
+      // Expect to get 3 matching docIds since topK = 3 is used
+      Assert.assertEquals(matchedDocIds.length, 3);
+      Assert.assertEquals(matchedDocIds[0], 0);
+      Assert.assertEquals(matchedDocIds[1], 2);
+      Assert.assertEquals(matchedDocIds[1], 2);
+    }
+  }
+
+  @Test
+  public void testIndexWriterReaderWithTop1()
+      throws IOException {
+    // Use VectorIndex reader to validate that reads work
+    try (HnswVectorIndexReader reader = new HnswVectorIndexReader("foo", 
INDEX_DIR, 4, _config)) {
+      int[] matchedDocIds = reader.getDocIds(new float[]{1.0F, 2.0F, 3.0F, 
4.0F, 5.0F}, 1).toArray();
+      // Expect to get 1 matching docId since topK = 1 is used
+      Assert.assertEquals(matchedDocIds.length, 1);
+      Assert.assertEquals(matchedDocIds[0], 2);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
index 0e88cc73e9f..870983e97ab 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneFSTIndexCreatorTest.java
@@ -25,15 +25,18 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader;
+import org.apache.pinot.segment.local.utils.fst.FSTBuilder;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.segment.spi.V1Constants.Indexes.LUCENE_V912_FST_INDEX_FILE_EXTENSION;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 
 
 public class LuceneFSTIndexCreatorTest implements 
PinotBuffersAfterMethodCheckRule {
@@ -59,9 +62,8 @@ public class LuceneFSTIndexCreatorTest implements 
PinotBuffersAfterMethodCheckRu
     uniqueValues[1] = "hello-world123";
     uniqueValues[2] = "still";
 
-    FieldSpec fieldSpec = new DimensionFieldSpec("testFSTColumn", 
FieldSpec.DataType.STRING, true);
-    LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(
-        INDEX_DIR, "testFSTColumn", uniqueValues);
+    LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR, 
"testFSTColumn", "myTable_OFFLINE",
+        false, uniqueValues);
     creator.seal();
     File fstFile = new File(INDEX_DIR, "testFSTColumn" + 
LUCENE_V912_FST_INDEX_FILE_EXTENSION);
     try (PinotDataBuffer pinotDataBuffer =
@@ -69,12 +71,64 @@ public class LuceneFSTIndexCreatorTest implements 
PinotBuffersAfterMethodCheckRu
         LuceneFSTIndexReader reader = new 
LuceneFSTIndexReader(pinotDataBuffer)) {
 
       int[] matchedDictIds = reader.getDictIds("hello.*").toArray();
-      Assert.assertEquals(2, matchedDictIds.length);
-      Assert.assertEquals(0, matchedDictIds[0]);
-      Assert.assertEquals(1, matchedDictIds[1]);
+      Assert.assertEquals(matchedDictIds.length, 2);
+      Assert.assertEquals(matchedDictIds[0], 0);
+      Assert.assertEquals(matchedDictIds[1], 1);
 
       matchedDictIds = reader.getDictIds(".*llo").toArray();
-      Assert.assertEquals(0, matchedDictIds.length);
+      Assert.assertEquals(matchedDictIds.length, 0);
+
+      matchedDictIds = reader.getDictIds("st.*").toArray();
+      Assert.assertEquals(matchedDictIds.length, 1);
+      Assert.assertEquals(matchedDictIds[0], 2);
+    }
+  }
+
+  @Test
+  public void testIndexWriterReaderWithAddExceptionsContinueOnErrorTrue()
+      throws IOException {
+    String[] uniqueValues = new String[3];
+    uniqueValues[0] = "hello-world";
+    uniqueValues[1] = "hello-world123";
+    uniqueValues[2] = "still";
+
+    FSTBuilder fstBuilder = Mockito.spy(new FSTBuilder());
+    // For the word "still" throw an exception so it is not indexed
+    doThrow(IOException.class).when(fstBuilder).addEntry(eq("still"), 
anyInt());
+    LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR, 
"testFSTColumn", "myTable_OFFLINE",
+        true, uniqueValues, fstBuilder);
+    creator.seal();
+    File fstFile = new File(INDEX_DIR, "testFSTColumn" + 
LUCENE_V912_FST_INDEX_FILE_EXTENSION);
+    try (PinotDataBuffer pinotDataBuffer =
+        PinotDataBuffer.mapFile(fstFile, true, 0, fstFile.length(), 
ByteOrder.BIG_ENDIAN, "fstIndexFile");
+        LuceneFSTIndexReader reader = new 
LuceneFSTIndexReader(pinotDataBuffer)) {
+
+      int[] matchedDictIds = reader.getDictIds("hello.*").toArray();
+      Assert.assertEquals(matchedDictIds.length, 2);
+      Assert.assertEquals(matchedDictIds[0], 0);
+      Assert.assertEquals(matchedDictIds[1], 1);
+
+      matchedDictIds = reader.getDictIds(".*llo").toArray();
+      Assert.assertEquals(matchedDictIds.length, 0);
+
+      // Validate that nothing matches st.*
+      matchedDictIds = reader.getDictIds("st.*").toArray();
+      Assert.assertEquals(matchedDictIds.length, 0);
     }
   }
+
+  @Test
+  public void testIndexWriterReaderWithAddExceptionsContinueOnErrorFalse()
+      throws IOException {
+    String[] uniqueValues = new String[3];
+    uniqueValues[0] = "hello-world";
+    uniqueValues[1] = "hello-world123";
+    uniqueValues[2] = "still";
+
+    FSTBuilder fstBuilder = Mockito.spy(new FSTBuilder());
+    // For the word "still" throw an exception so it is not indexed
+    doThrow(IOException.class).when(fstBuilder).addEntry(eq("still"), 
anyInt());
+    Assert.assertThrows(IOException.class, () -> new 
LuceneFSTIndexCreator(INDEX_DIR, "testFSTColumn",
+        "myTable_OFFLINE", false, uniqueValues, fstBuilder));
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java
new file mode 100644
index 00000000000..c8278188be6
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/LuceneTextIndexCreatorTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class LuceneTextIndexCreatorTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
LuceneTextIndexCreatorTest.class.toString());
+
+  @BeforeMethod
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(INDEX_DIR);
+
+    TextIndexConfig config = new TextIndexConfig(false, null, null, false, 
false, null, null, true, 500, null, null,
+        null, null, false, false, 0, false, null);
+    try (LuceneTextIndexCreator creator = new LuceneTextIndexCreator("foo", 
INDEX_DIR, true, false, null, null,
+        config)) {
+      creator.add("{\"clean\":\"this\"}");
+      creator.add("{\"retain\":\"this\"}");
+      creator.add("{\"keep\":\"this\"}");
+      creator.add("{\"hold\":\"this\"}");
+      creator.add("{\"clean\":\"that\"}");
+      creator.seal();
+    }
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @Test
+  public void testIndexWriterReaderMatchClean()
+      throws IOException {
+    // Use TextIndex reader to validate that reads work
+    try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", 
INDEX_DIR, 5, new HashMap<>())) {
+      int[] matchedDocIds = reader.getDocIds("clean").toArray();
+      Assert.assertEquals(matchedDocIds.length, 2);
+      Assert.assertEquals(matchedDocIds[0], 0);
+      Assert.assertEquals(matchedDocIds[1], 4);
+    }
+  }
+
+  @Test
+  public void testIndexWriterReaderMatchHold()
+      throws IOException {
+    // Use TextIndex reader to validate that reads work
+    try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", 
INDEX_DIR, 5, new HashMap<>())) {
+      int[] matchedDocIds = reader.getDocIds("hold").toArray();
+      Assert.assertEquals(matchedDocIds.length, 1);
+      Assert.assertEquals(matchedDocIds[0], 3);
+    }
+  }
+
+  @Test
+  public void testIndexWriterReaderMatchRetain()
+      throws IOException {
+    // Use TextIndex reader to validate that reads work
+    try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", 
INDEX_DIR, 5, new HashMap<>())) {
+      int[] matchedDocIds = reader.getDocIds("retain").toArray();
+      Assert.assertEquals(matchedDocIds.length, 1);
+      Assert.assertEquals(matchedDocIds[0], 1);
+    }
+  }
+
+  @Test
+  public void testIndexWriterReaderMatchWithOrClause()
+      throws IOException {
+    // Use TextIndex reader to validate that reads work
+    try (LuceneTextIndexReader reader = new LuceneTextIndexReader("foo", 
INDEX_DIR, 5, new HashMap<>())) {
+      int[] matchedDocIds = reader.getDocIds("retain|keep").toArray();
+      Assert.assertEquals(matchedDocIds.length, 2);
+      Assert.assertEquals(matchedDocIds[0], 1);
+      Assert.assertEquals(matchedDocIds[1], 2);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
index 34189413aa2..2729c42ef4a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/NativeTextIndexCreatorTest.java
@@ -24,13 +24,15 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.PinotBuffersAfterMethodCheckRule;
 import 
org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.segment.spi.V1Constants.Indexes.NATIVE_TEXT_INDEX_FILE_EXTENSION;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 
 
@@ -58,8 +60,8 @@ public class NativeTextIndexCreatorTest implements 
PinotBuffersAfterMethodCheckR
     uniqueValues[2] = "still";
     uniqueValues[3] = "zoobar";
 
-    FieldSpec fieldSpec = new DimensionFieldSpec("testFSTColumn", 
FieldSpec.DataType.STRING, true);
-    try (NativeTextIndexCreator creator = new 
NativeTextIndexCreator("testFSTColumn", INDEX_DIR)) {
+    try (NativeTextIndexCreator creator = new 
NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE", false,
+        INDEX_DIR)) {
       for (int i = 0; i < 4; i++) {
         creator.add(uniqueValues[i]);
       }
@@ -93,4 +95,81 @@ public class NativeTextIndexCreatorTest implements 
PinotBuffersAfterMethodCheckR
       }
     }
   }
+
+  @Test
+  public void testIndexWriterReaderWithAddExceptionsWithContinueOnErrorTrue()
+      throws IOException {
+    String[] uniqueValues = new String[4];
+    uniqueValues[0] = "still";
+    uniqueValues[1] = "zoobar";
+    uniqueValues[2] = "hello-world";
+    uniqueValues[3] = "hello-world123";
+
+    try (NativeTextIndexCreator creator = spy(new 
NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE",
+        true, INDEX_DIR))) {
+      // Add a couple of words so they show up in the index
+      for (int i = 0; i < 2; i++) {
+        creator.add(uniqueValues[i]);
+      }
+
+      // Throw exception for the remaining words
+      doThrow(RuntimeException.class).when(creator).analyze(anyString());
+      for (int i = 2; i < 4; i++) {
+        creator.add(uniqueValues[i]);
+      }
+
+      creator.seal();
+    }
+
+    File fstFile = new File(INDEX_DIR, "testFSTColumn" + 
NATIVE_TEXT_INDEX_FILE_EXTENSION);
+    try (NativeTextIndexReader reader = new 
NativeTextIndexReader("testFSTColumn", fstFile.getParentFile())) {
+      try {
+        int[] matchedDocIds = reader.getDocIds("hello.*").toArray();
+        assertEquals(matchedDocIds.length, 0);
+
+        matchedDocIds = reader.getDocIds(".*llo").toArray();
+        assertEquals(matchedDocIds.length, 0);
+
+        matchedDocIds = reader.getDocIds("wor.*").toArray();
+        assertEquals(matchedDocIds.length, 0);
+
+        matchedDocIds = reader.getDocIds("zoo.*").toArray();
+        assertEquals(matchedDocIds.length, 1);
+        assertEquals(matchedDocIds[0], 1);
+
+        matchedDocIds = reader.getDocIds(".*il.*").toArray();
+        assertEquals(matchedDocIds.length, 1);
+        assertEquals(matchedDocIds[0], 0);
+
+        matchedDocIds = reader.getDocIds(".*").toArray();
+        assertEquals(matchedDocIds.length, 2);
+        assertEquals(matchedDocIds[0], 0);
+        assertEquals(matchedDocIds[1], 1);
+      } finally {
+        reader.closeInTest();
+      }
+    }
+  }
+
+  @Test
+  public void testIndexWriterReaderWithAddExceptionsWithContinueOnErrorFalse()
+      throws IOException {
+    String[] uniqueValues = new String[4];
+    uniqueValues[0] = "still";
+    uniqueValues[1] = "zoobar";
+    uniqueValues[2] = "hello-world";
+    uniqueValues[3] = "hello-world123";
+
+    try (NativeTextIndexCreator creator = spy(new 
NativeTextIndexCreator("testFSTColumn", "myTable_OFFLINE",
+        false, INDEX_DIR))) {
+      // Add a couple of words so they show up in the index
+      for (int i = 0; i < 2; i++) {
+        creator.add(uniqueValues[i]);
+      }
+
+      // Throw exception for the remaining words
+      doThrow(RuntimeException.class).when(creator).analyze(anyString());
+      Assert.assertThrows(RuntimeException.class, () -> 
creator.add(uniqueValues[2]));
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index e1349741bfd..f41133629b5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -178,7 +178,7 @@ public class FilePerIndexDirectoryTest implements 
PinotBuffersAfterMethodCheckRu
       throws IOException {
     // See https://github.com/apache/pinot/issues/11529
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
-        NativeTextIndexCreator fooCreator = new NativeTextIndexCreator("foo", 
TEMP_DIR)) {
+        NativeTextIndexCreator fooCreator = new NativeTextIndexCreator("foo", 
"myTable_OFFLINE", false, TEMP_DIR)) {
 
       fooCreator.add("{\"clean\":\"this\"}");
       fooCreator.seal();
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
index 218b4f3961e..f2174082342 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/FSTIndexCreator.java
@@ -40,7 +40,8 @@ public interface FSTIndexCreator extends IndexCreator {
   /**
    * Adds the next document.
    */
-  void add(String document);
+  void add(String document)
+      throws IOException;
 
   /**
    * Adds a set of documents to the index
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
index 62be6acc478..7fef8e04827 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.IndexCreator;
-import org.apache.pinot.spi.utils.JsonUtils;
 
 
 /**
@@ -36,7 +35,7 @@ public interface JsonIndexCreator extends IndexCreator {
   default void add(Object value, int dictId)
       throws IOException {
     if (value instanceof Map) {
-      add(JsonUtils.objectToString(value));
+      add((Map) value);
     } else {
       add((String) value);
     }
@@ -52,6 +51,12 @@ public interface JsonIndexCreator extends IndexCreator {
   void add(String jsonString)
       throws IOException;
 
+  /**
+   * Adds the next json value for Map type
+   */
+  void add(Map jsonMap)
+    throws IOException;
+
   /**
    * Seals the index and flushes it to disk.
    */
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index ff2309968fa..83b7d351b89 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -79,7 +79,7 @@ public class JsonUtils {
   public static final String ARRAY_INDEX_KEY = ".$index";
   public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
   public static final int MAX_COMBINATIONS = 100_000;
-  private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
+  public static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
       Collections.singletonList(Collections.singletonMap(VALUE_KEY, 
SKIPPED_VALUE_REPLACEMENT));
 
   // For querying
@@ -736,14 +736,14 @@ public class JsonUtils {
     JsonNode jsonNode;
     try {
       jsonNode = JsonUtils.stringToJsonNode(jsonString);
-    } catch (JsonProcessingException e) {
+      return JsonUtils.flatten(jsonNode, jsonIndexConfig);
+    } catch (Exception e) {
       if (jsonIndexConfig.getSkipInvalidJson()) {
         return SKIPPED_FLATTENED_RECORD;
       } else {
         throw e;
       }
     }
-    return JsonUtils.flatten(jsonNode, jsonIndexConfig);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to