This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b3886064f Allow to build index on preserved field in
SchemaConformingTransformer (#13993)
4b3886064f is described below
commit 4b3886064f75b68cd014240104f3da8260e2cb9a
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Sep 13 21:37:27 2024 -0700
Allow to build index on preserved field in SchemaConformingTransformer
(#13993)
---
.../SchemaConformingTransformerV2.java | 22 ++++++-
.../SchemaConformingTransformerV2Test.java | 69 +++++++++++++++-------
.../SchemaConformingTransformerV2Config.java | 18 ++++++
3 files changed, 86 insertions(+), 23 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
index 47b629f522..923b49625c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
@@ -382,8 +382,13 @@ public class SchemaConformingTransformerV2 implements
RecordTransformer {
}
String keyJsonPath = String.join(".", jsonPath);
- if
(_transformerConfig.getFieldPathsToPreserveInput().contains(keyJsonPath)) {
+
+ if (_transformerConfig.getFieldPathsToPreserveInput().contains(keyJsonPath)
+ ||
_transformerConfig.getFieldPathsToPreserveInputWithIndex().contains(keyJsonPath))
{
outputRecord.putValue(keyJsonPath, value);
+ if
(_transformerConfig.getFieldPathsToPreserveInputWithIndex().contains(keyJsonPath))
{
+ flattenAndAddToMergedTextIndexMap(mergedTextIndexMap, keyJsonPath,
value);
+ }
return extraFieldsContainer;
}
@@ -592,6 +597,21 @@ public class SchemaConformingTransformerV2 implements
RecordTransformer {
_mergedTextIndexDocumentBytesCount / _mergedTextIndexDocumentCount);
}
+ private void flattenAndAddToMergedTextIndexMap(Map<String, Object>
mergedTextIndexMap, String key, Object value) {
+ String unindexableFieldSuffix =
_transformerConfig.getUnindexableFieldSuffix();
+ if (null != unindexableFieldSuffix &&
key.endsWith(unindexableFieldSuffix)) {
+ return;
+ }
+ if (value instanceof Map) {
+ Map<String, Object> map = (Map<String, Object>) value;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ flattenAndAddToMergedTextIndexMap(mergedTextIndexMap, key + "." +
entry.getKey(), entry.getValue());
+ }
+ } else {
+ mergedTextIndexMap.put(key, value);
+ }
+ }
+
/**
* Converts (if necessary) and adds the given extras field to the output
record
*/
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
index cd1d85dc1d..6ea6d66cf9 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
@@ -67,6 +67,7 @@ public class SchemaConformingTransformerV2Test {
private static final String TEST_JSON_NULL_FIELD_NAME = "nullField";
private static final String TEST_JSON_STRING_FIELD_NAME = "stringField";
private static final String TEST_JSON_MAP_FIELD_NAME = "mapField";
+ private static final String TEST_JSON_MAP_EXTRA_FIELD_NAME = "mapFieldExtra";
private static final String TEST_JSON_MAP_NO_IDX_FIELD_NAME =
"mapField_noIndex";
private static final String TEST_JSON_NESTED_MAP_FIELD_NAME = "nestedFields";
private static final String TEST_JSON_INT_NO_IDX_FIELD_NAME =
"intField_noIndex";
@@ -97,7 +98,7 @@ public class SchemaConformingTransformerV2Test {
IngestionConfig ingestionConfig = new IngestionConfig();
SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
new SchemaConformingTransformerV2Config(true,
INDEXABLE_EXTRAS_FIELD_NAME, true, UNINDEXABLE_EXTRAS_FIELD_NAME,
- UNINDEXABLE_FIELD_SUFFIX, null, null, null, null, null, null,
null, null);
+ UNINDEXABLE_FIELD_SUFFIX, null, null, null, null, null, null,
null, null, null);
ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config);
return new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
.build();
@@ -105,12 +106,12 @@ public class SchemaConformingTransformerV2Test {
private static TableConfig createDefaultTableConfig(String
indexableExtrasField, String unindexableExtrasField,
String unindexableFieldSuffix, Set<String> fieldPathsToDrop, Set<String>
fieldPathsToPreserve,
- String mergedTextIndexField) {
+ Set<String> fieldPathToPreserverWithIndex, String mergedTextIndexField) {
IngestionConfig ingestionConfig = new IngestionConfig();
SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
new SchemaConformingTransformerV2Config(indexableExtrasField != null,
indexableExtrasField,
unindexableExtrasField != null, unindexableExtrasField,
unindexableFieldSuffix, fieldPathsToDrop,
- fieldPathsToPreserve, mergedTextIndexField, null, null, null,
null, null);
+ fieldPathsToPreserve, fieldPathToPreserverWithIndex,
mergedTextIndexField, null, null, null, null, null);
ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config);
return new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
.build();
@@ -600,6 +601,13 @@ public class SchemaConformingTransformerV2Test {
"intField_noIndex":9,
"string_noIndex":"z"
},
+ "mapFieldExtra":{
+ "arrayField":[0, 1, 2, 3],
+ "nullField":null,
+ "stringField":"a",
+ "intField_noIndex":9,
+ "string_noIndex":"z"
+ },
"mapField_noIndex":{
"arrayField":[0, 1, 2, 3],
"nullField":null,
@@ -627,6 +635,7 @@ public class SchemaConformingTransformerV2Test {
.set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
.set(TEST_JSON_STRING_NO_IDX_FIELD_NAME,
TEST_JSON_STRING_NO_IDX_NODE)
.set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
+ .set(TEST_JSON_MAP_EXTRA_FIELD_NAME,
TEST_JSON_MAP_NODE_WITH_NO_IDX)
.set(TEST_JSON_MAP_NO_IDX_FIELD_NAME,
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME,
TEST_JSON_ARRAY_NODE)
.set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
@@ -646,6 +655,7 @@ public class SchemaConformingTransformerV2Test {
// map the column someMeaningfulName to nestedFields.stringField
schemaBuilder =
createDefaultSchemaBuilder().addSingleValueDimension("arrayField",
DataType.STRING)
.addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING)
+ .addSingleValueDimension(TEST_JSON_MAP_EXTRA_FIELD_NAME,
DataType.STRING)
.addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME,
DataType.JSON)
.addSingleValueDimension(destColumnName, DataType.STRING);
@@ -664,6 +674,11 @@ public class SchemaConformingTransformerV2Test {
add(TEST_JSON_MAP_FIELD_NAME);
}
};
+ Set<String> pathToPreserveWithIndex = new HashSet<>() {
+ {
+ add(TEST_JSON_MAP_EXTRA_FIELD_NAME);
+ }
+ };
/*
{
@@ -675,6 +690,13 @@ public class SchemaConformingTransformerV2Test {
"stringField":"a",
"intField_noIndex":9,
"string_noIndex":"z"
+ },
+ "mapFieldExtra":{
+ "arrayField":[0,1,2,3],
+ "nullField":null,
+ "stringField":"a",
+ "intField_noIndex":9,
+ "string_noIndex":"z"
}
"indexableExtras":{
"nullField":null,
@@ -698,8 +720,7 @@ public class SchemaConformingTransformerV2Test {
}
},
__mergedTextIndex: [
- "[0, 1, 2, 3]:arrayField", "a:stringField",
- "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField",
+ // check expectedJsonNodeWithMergedTextIndex
]
}
*/
@@ -707,6 +728,7 @@ public class SchemaConformingTransformerV2Test {
.set(TEST_JSON_ARRAY_FIELD_NAME, N.textNode("[0,1,2,3]"))
.set(destColumnName, TEST_JSON_STRING_NODE)
.set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
+ .set(TEST_JSON_MAP_EXTRA_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
.set(INDEXABLE_EXTRAS_FIELD_NAME,
CustomObjectNode.create().set(TEST_JSON_NULL_FIELD_NAME,
TEST_JSON_NULL_NODE)
.set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
@@ -722,41 +744,44 @@ public class SchemaConformingTransformerV2Test {
.set(TEST_JSON_STRING_NO_IDX_FIELD_NAME,
TEST_JSON_STRING_NO_IDX_NODE)));
expectedJsonNodeWithMergedTextIndex =
expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME,
-
N.arrayNode().add("0:arrayField").add("1:arrayField").add("2:arrayField").add("3:arrayField").
-
add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:nestedFields.arrayField").
-
add("0:nestedFields.arrayField").add("1:nestedFields.arrayField").add("2:nestedFields.arrayField").
-
add("3:nestedFields.arrayField").add("a:nestedFields.stringField"));
+
N.arrayNode().add("0:arrayField").add("1:arrayField").add("2:arrayField").add("3:arrayField")
+
.add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:nestedFields.arrayField")
+
.add("0:nestedFields.arrayField").add("1:nestedFields.arrayField").add("2:nestedFields.arrayField")
+ .add("3:nestedFields.arrayField").add("a:nestedFields.stringField")
+
.add("[0,1,2,3]:mapFieldExtra.arrayField").add("a:mapFieldExtra.stringField")
+
.add("0:mapFieldExtra.arrayField").add("1:mapFieldExtra.arrayField")
+
.add("2:mapFieldExtra.arrayField").add("3:mapFieldExtra.arrayField"));
transformKeyValueTransformation(
schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME,
DataType.STRING).build(), keyMapping,
- pathToDrop, pathToPreserve, inputJsonNode,
expectedJsonNodeWithMergedTextIndex);
+ pathToDrop, pathToPreserve, pathToPreserveWithIndex, inputJsonNode,
expectedJsonNodeWithMergedTextIndex);
}
private void transformWithIndexableFields(Schema schema, JsonNode
inputRecordJsonNode, JsonNode ouputRecordJsonNode) {
- testTransform(INDEXABLE_EXTRAS_FIELD_NAME, null, null, schema, null, null,
null, inputRecordJsonNode.toString(),
- ouputRecordJsonNode.toString());
+ testTransform(INDEXABLE_EXTRAS_FIELD_NAME, null, null, schema, null, null,
null, null,
+ inputRecordJsonNode.toString(), ouputRecordJsonNode.toString());
}
private void transformWithUnIndexableFieldsAndMergedTextIndex(Schema schema,
JsonNode inputRecordJsonNode,
JsonNode ouputRecordJsonNode) {
testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME,
MERGED_TEXT_INDEX_FIELD_NAME, schema,
- null, null, null, inputRecordJsonNode.toString(),
ouputRecordJsonNode.toString());
+ null, null, null, null, inputRecordJsonNode.toString(),
ouputRecordJsonNode.toString());
}
private void transformKeyValueTransformation(Schema schema, Map<String,
String> keyMapping,
- Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, JsonNode
inputRecordJsonNode,
- JsonNode ouputRecordJsonNode) {
+ Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve,
Set<String> fieldPathsToPreserveWithIndex,
+ JsonNode inputRecordJsonNode, JsonNode ouputRecordJsonNode) {
testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME,
MERGED_TEXT_INDEX_FIELD_NAME, schema,
- keyMapping, fieldPathsToDrop, fieldPathsToPreserve,
inputRecordJsonNode.toString(),
- ouputRecordJsonNode.toString());
+ keyMapping, fieldPathsToDrop, fieldPathsToPreserve,
fieldPathsToPreserveWithIndex,
+ inputRecordJsonNode.toString(), ouputRecordJsonNode.toString());
}
private void testTransform(String indexableExtrasField, String
unindexableExtrasField, String mergedTextIndexField,
Schema schema, Map<String, String> keyMapping, Set<String>
fieldPathsToDrop, Set<String> fieldPathsToPreserve,
- String inputRecordJSONString,
+ Set<String> fieldPathsToPreserveWithIndex, String inputRecordJSONString,
String expectedOutputRecordJSONString) {
TableConfig tableConfig =
createDefaultTableConfig(indexableExtrasField, unindexableExtrasField,
UNINDEXABLE_FIELD_SUFFIX,
- fieldPathsToDrop, fieldPathsToPreserve, mergedTextIndexField);
+ fieldPathsToDrop, fieldPathsToPreserve,
fieldPathsToPreserveWithIndex, mergedTextIndexField);
tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config().setColumnNameToJsonKeyPathMap(keyMapping);
GenericRow outputRecord = transformRow(tableConfig, schema,
inputRecordJSONString);
Map<String, Object> expectedOutputRecordMap =
jsonStringToMap(expectedOutputRecordJSONString);
@@ -820,8 +845,8 @@ public class SchemaConformingTransformerV2Test {
Schema schema =
createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING)
.addSingleValueDimension("a.b.c", DataType.INT).build();
SchemaConformingTransformerV2.validateSchema(schema,
- new SchemaConformingTransformerV2Config(null,
INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null,
- null, null, null, null, null));
+ new SchemaConformingTransformerV2Config(null,
INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null,
+ null, null, null, null, null, null, null));
} catch (Exception ex) {
fail("Should not have thrown any exception when overlapping schema
occurs");
}
@@ -832,7 +857,7 @@ public class SchemaConformingTransformerV2Test {
.addSingleValueDimension("a.b", DataType.STRING).build();
SchemaConformingTransformerV2.validateSchema(schema,
new SchemaConformingTransformerV2Config(null,
INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null,
- null, null, null, null, null));
+ null, null, null, null, null, null));
} catch (Exception ex) {
fail("Should not have thrown any exception when overlapping schema
occurs");
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
index 1d58d76f81..5bc8e3e340 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
@@ -54,6 +54,10 @@ public class SchemaConformingTransformerV2Config extends
BaseJsonConfig {
+ "input. This will also skip building mergedTextIndex for the field.")
private Set<String> _fieldPathsToPreserveInput = new HashSet<>();
+ @JsonPropertyDescription("Array of flattened (dot-delimited) object paths
not to traverse further and keep same as "
+ + "input. This will NOT skip building mergedTextIndex for the field.")
+ private Set<String> _fieldPathsToPreserveInputWithIndex = new HashSet<>();
+
@JsonPropertyDescription("Map from customized meaningful column name to json
key path")
private Map<String, String> _columnNameToJsonKeyPathMap = new HashMap<>();
@@ -95,6 +99,7 @@ public class SchemaConformingTransformerV2Config extends
BaseJsonConfig {
@JsonProperty("unindexableFieldSuffix") @Nullable String
unindexableFieldSuffix,
@JsonProperty("fieldPathsToDrop") @Nullable Set<String> fieldPathsToDrop,
@JsonProperty("fieldPathsToKeepSameAsInput") @Nullable Set<String>
fieldPathsToPreserveInput,
+ @JsonProperty("fieldPathsToKeepSameAsInputWithIndex") @Nullable
Set<String> fieldPathsToPreserveInputWithIndex,
@JsonProperty("mergedTextIndexField") @Nullable String
mergedTextIndexField,
@JsonProperty("mergedTextIndexDocumentMaxLength") @Nullable Integer
mergedTextIndexDocumentMaxLength,
@JsonProperty("mergedTextIndexShinglingOverlapLength") @Nullable Integer
mergedTextIndexShinglingOverlapLength,
@@ -110,6 +115,7 @@ public class SchemaConformingTransformerV2Config extends
BaseJsonConfig {
setUnindexableFieldSuffix(unindexableFieldSuffix);
setFieldPathsToDrop(fieldPathsToDrop);
setFieldPathsToPreserveInput(fieldPathsToPreserveInput);
+ setFieldPathsToPreserveInputWithIndex(fieldPathsToPreserveInputWithIndex);
setMergedTextIndexField(mergedTextIndexField);
setMergedTextIndexDocumentMaxLength(mergedTextIndexDocumentMaxLength);
@@ -175,6 +181,18 @@ public class SchemaConformingTransformerV2Config extends
BaseJsonConfig {
return this;
}
+ public Set<String> getFieldPathsToPreserveInputWithIndex() {
+ return _fieldPathsToPreserveInputWithIndex;
+ }
+
+ public SchemaConformingTransformerV2Config
setFieldPathsToPreserveInputWithIndex(
+ Set<String> fieldPathsToPreserveInputWithIndex) {
+ _fieldPathsToPreserveInputWithIndex =
+ fieldPathsToPreserveInputWithIndex == null ?
_fieldPathsToPreserveInputWithIndex
+ : fieldPathsToPreserveInputWithIndex;
+ return this;
+ }
+
public Map<String, String> getColumnNameToJsonKeyPathMap() {
return _columnNameToJsonKeyPathMap;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]