This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8684e046c0 Skip invalid json string rather than throwing error during json indexing (#12238) 8684e046c0 is described below commit 8684e046c0bce504224a3a6179be6b51117511ce Author: Xuanyi Li <xuany...@uber.com> AuthorDate: Thu Feb 8 15:20:21 2024 -0800 Skip invalid json string rather than throwing error during json indexing (#12238) --- .../realtime/impl/json/MutableJsonIndexImpl.java | 3 +- .../impl/inv/json/BaseJsonIndexCreator.java | 2 +- .../segment/local/segment/index/JsonIndexTest.java | 47 ++++++++++++++++++++++ .../pinot/spi/config/table/JsonIndexConfig.java | 20 +++++++-- .../java/org/apache/pinot/spi/utils/JsonUtils.java | 19 ++++++++- 5 files changed, 84 insertions(+), 7 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java index 46e7798204..7e632a19b2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java @@ -78,8 +78,7 @@ public class MutableJsonIndexImpl implements MutableJsonIndex { public void add(String jsonString) throws IOException { try { - List<Map<String, String>> flattenedRecords = - JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig); + List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonString, _jsonIndexConfig); _writeLock.lock(); try { addFlattenedRecords(flattenedRecords); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java index 264bc6044b..5c3409fe45 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java @@ -91,7 +91,7 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator { @Override public void add(String jsonString) throws IOException { - addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig)); + addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig)); } /** diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java index 4d3cbf8cae..461f8eb93e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.segment.local.segment.index; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -69,6 +71,7 @@ public class JsonIndexTest { FileUtils.deleteDirectory(INDEX_DIR); } + @Test public void testSmallIndex() throws Exception { @@ -371,6 +374,50 @@ public class JsonIndexTest { } } + @Test + public void testSkipInvalidJsonEnable() throws Exception { + JsonIndexConfig jsonIndexConfig = new JsonIndexConfig(); + jsonIndexConfig.setSkipInvalidJson(true); + // the braces don't match and cannot be parsed + String[] records = {"{\"key1\":\"va\""}; + + createIndex(true, jsonIndexConfig, records); + File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + Assert.assertTrue(onHeapIndexFile.exists()); + + createIndex(false, jsonIndexConfig, records); + File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + Assert.assertTrue(offHeapIndexFile.exists()); + + try (PinotDataBuffer onHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile); + PinotDataBuffer offHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile); + JsonIndexReader onHeapIndexReader = new ImmutableJsonIndexReader(onHeapDataBuffer, records.length); + JsonIndexReader offHeapIndexReader = new ImmutableJsonIndexReader(offHeapDataBuffer, records.length); + MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(jsonIndexConfig)) { + for (String record : records) { + mutableJsonIndex.add(record); + } + Map<String, RoaringBitmap> onHeapRes = onHeapIndexReader.getMatchingDocsMap(""); + Map<String, RoaringBitmap> offHeapRes = offHeapIndexReader.getMatchingDocsMap(""); + Map<String, RoaringBitmap> mutableRes = mutableJsonIndex.getMatchingDocsMap(""); + Map<String, RoaringBitmap> expectedRes = Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT, + RoaringBitmap.bitmapOf(0)); + Assert.assertEquals(expectedRes, onHeapRes); + Assert.assertEquals(expectedRes, offHeapRes); + Assert.assertEquals(expectedRes, mutableRes); + } + } + + @Test(expectedExceptions = JsonProcessingException.class) + public void testSkipInvalidJsonDisabled() throws Exception { + // by default, skipInvalidJson is disabled + JsonIndexConfig jsonIndexConfig = new JsonIndexConfig(); + // the braces don't match and cannot be parsed + String[] records = {"{\"key1\":\"va\""}; + + createIndex(true, jsonIndexConfig, records); + } + public static class ConfTest extends AbstractSerdeIndexContract { @Test diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java index cada2fe4a4..1a0964138e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java @@ -41,6 +41,8 @@ import javax.annotation.Nullable; * - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is under the included paths. * - maxValueLength: Exclude field values which are longer than this length. A value of "0" disables this filter. * Excluded values will be replaced with JsonUtils.SKIPPED_VALUE_REPLACEMENT. + * - skipInvalidJson: If the raw data is not a valid json string, then replace with {"":SKIPPED_VALUE_REPLACEMENT} + * and continue indexing on following Json records. */ public class JsonIndexConfig extends IndexConfig { public static final JsonIndexConfig DISABLED = new JsonIndexConfig(true); @@ -52,6 +54,7 @@ public class JsonIndexConfig extends IndexConfig { private Set<String> _excludePaths; private Set<String> _excludeFields; private int _maxValueLength = 0; + private boolean _skipInvalidJson = false; public JsonIndexConfig() { super(false); @@ -68,7 +71,8 @@ public class JsonIndexConfig extends IndexConfig { @JsonProperty("includePaths") @Nullable Set<String> includePaths, @JsonProperty("excludePaths") @Nullable Set<String> excludePaths, @JsonProperty("excludeFields") @Nullable Set<String> excludeFields, - @JsonProperty("maxValueLength") int maxValueLength) { + @JsonProperty("maxValueLength") int maxValueLength, + @JsonProperty("skipInvalidJson") boolean skipInvalidJson) { super(disabled); _maxLevels = maxLevels; _excludeArray = excludeArray; @@ -77,6 +81,7 @@ public class JsonIndexConfig extends IndexConfig { _excludePaths = excludePaths; _excludeFields = excludeFields; _maxValueLength = maxValueLength; + _skipInvalidJson = skipInvalidJson; } public int getMaxLevels() { @@ -143,6 +148,14 @@ public class JsonIndexConfig extends IndexConfig { _maxValueLength = maxValueLength; } + public boolean getSkipInvalidJson() { + return _skipInvalidJson; + } + + public void setSkipInvalidJson(boolean skipInvalidJson) { + _skipInvalidJson = skipInvalidJson; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -158,12 +171,13 @@ public class JsonIndexConfig extends IndexConfig { return _maxLevels == config._maxLevels && _excludeArray == config._excludeArray && _disableCrossArrayUnnest == config._disableCrossArrayUnnest && Objects.equals(_includePaths, config._includePaths) && Objects.equals(_excludePaths, config._excludePaths) && Objects.equals(_excludeFields, - config._excludeFields) && _maxValueLength == config._maxValueLength; + config._excludeFields) && _maxValueLength == config._maxValueLength + && _skipInvalidJson == config._skipInvalidJson; } @Override public int hashCode() { return Objects.hash(super.hashCode(), _maxLevels, _excludeArray, _disableCrossArrayUnnest, _includePaths, - _excludePaths, _excludeFields, _maxValueLength); + _excludePaths, _excludeFields, _maxValueLength, _skipInvalidJson); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java index 8593b2f8cb..396c21ad74 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java @@ -76,6 +76,8 @@ public class JsonUtils { public static final String ARRAY_INDEX_KEY = ".$index"; public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$"; public static final int MAX_COMBINATIONS = 100_000; + private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD = + Collections.singletonList(Collections.singletonMap(VALUE_KEY, SKIPPED_VALUE_REPLACEMENT)); // For querying public static final String WILDCARD = "*"; @@ -356,7 +358,7 @@ public class JsonUtils { * ] * </pre> */ - public static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) { + protected static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) { try { return flatten(node, jsonIndexConfig, 0, "$", false); } catch (OutOfMemoryError oom) { @@ -719,4 +721,19 @@ public class JsonUtils { } } } + + public static List<Map<String, String>> flatten(String jsonString, JsonIndexConfig jsonIndexConfig) + throws IOException { + JsonNode jsonNode; + try { + jsonNode = JsonUtils.stringToJsonNode(jsonString); + } catch (JsonProcessingException e) { + if (jsonIndexConfig.getSkipInvalidJson()) { + return SKIPPED_FLATTENED_RECORD; + } else { + throw e; + } + } + return JsonUtils.flatten(jsonNode, jsonIndexConfig); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org