This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 13673f1150 Add SchemaConformingTransformerV2 to enhance text search abilities (#12788) 13673f1150 is described below commit 13673f11508f00ae35f5bb12f9cf97b6897ebfba Author: lnbest0707 <106711887+lnbest0707-u...@users.noreply.github.com> AuthorDate: Tue Apr 9 16:11:06 2024 -0700 Add SchemaConformingTransformerV2 to enhance text search abilities (#12788) * Add SchemaConformingTransformerV2 to enhance text search abilities * Fix style * Update __mergedTextIndex field logics * Fix UT * Resolve comments and add fieldPathsToPreserveInput config --- .../apache/pinot/common/metrics/ServerGauge.java | 1 + .../apache/pinot/common/metrics/ServerMeter.java | 1 + .../apache/pinot/queries/TransformQueriesTest.java | 2 +- .../recordtransformer/CompositeTransformer.java | 12 +- .../SchemaConformingTransformer.java | 31 +- .../SchemaConformingTransformerV2.java | 727 ++++++++++++++++ .../pinot/segment/local/utils/Base64Utils.java | 44 + .../pinot/segment/local/utils/IngestionUtils.java | 3 +- .../segment/local/utils/TableConfigUtils.java | 8 + .../SchemaConformingTransformerV2Test.java | 934 +++++++++++++++++++++ .../pinot/segment/local/utils/Base64UtilsTest.java | 96 +++ .../config/table/ingestion/IngestionConfig.java | 15 + .../SchemaConformingTransformerConfig.java | 4 +- .../SchemaConformingTransformerV2Config.java | 253 ++++++ 14 files changed, 2120 insertions(+), 11 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 45f34803a0..f0a1fdd136 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -38,6 +38,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge { LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS("seconds", false), LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS("seconds", false), REALTIME_OFFHEAP_MEMORY_USED("bytes", false), + REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN("bytes", false), REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false), LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true), // Upsert metrics diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 02005a3814..ed9769a68e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -40,6 +40,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { INVALID_REALTIME_ROWS_DROPPED("rows", false), INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false), REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true), + REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE("bytes", false), REALTIME_OFFSET_COMMITS("commits", true), REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false), diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java index 1f04d16d3b..cfb570d80e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java @@ -135,7 +135,7 @@ public class TransformQueriesTest extends BaseQueriesTest { .setIngestionConfig(new IngestionConfig(null, null, null, null, Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || " + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")), - null, null, null)) + null, null, null, null)) .build(); Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index cf88629f10..a1bfcba52a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -54,8 +54,13 @@ public class CompositeTransformer implements RecordTransformer { * records that have varying fields to a fixed schema without dropping any fields * </li> * <li> - * {@link DataTypeTransformer} after {@link SchemaConformingTransformer} to convert values to comply with the - * schema + * Optional {@link SchemaConformingTransformerV2} after {@link FilterTransformer}, so that we can transform + * input records that have varying fields to a fixed schema and keep or drop other fields by configuration. We + * could also gain enhanced text search capabilities from it. + * </li> + * <li> + * {@link DataTypeTransformer} after {@link SchemaConformingTransformer} or {@link SchemaConformingTransformerV2} + * to convert values to comply with the schema * </li> * <li> * Optional {@link TimeValidationTransformer} after {@link DataTypeTransformer} so that time value is converted to @@ -78,7 +83,8 @@ public class CompositeTransformer implements RecordTransformer { */ public static List<RecordTransformer> getDefaultTransformers(TableConfig tableConfig, Schema schema) { return Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), - new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), + new SchemaConformingTransformer(tableConfig, schema), + new SchemaConformingTransformerV2(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), new TimeValidationTransformer(tableConfig, schema), new SpecialValueTransformer(schema), new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp()) .collect(Collectors.toList()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java index 0d01e68f35..b9cfdce5a8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java @@ -174,7 +174,7 @@ public class SchemaConformingTransformer implements RecordTransformer { /** * @return The field type for the given extras field */ - private static DataType getAndValidateExtrasFieldType(Schema schema, @Nonnull String extrasFieldName) { + static DataType getAndValidateExtrasFieldType(Schema schema, @Nonnull String extrasFieldName) { FieldSpec fieldSpec = schema.getFieldSpecFor(extrasFieldName); Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in schema", extrasFieldName); DataType fieldDataType = fieldSpec.getDataType(); @@ -250,7 +250,7 @@ public class SchemaConformingTransformer implements RecordTransformer { * @param subKeys Returns the sub-keys * @throws IllegalArgumentException if any sub-key is empty */ - private static void getAndValidateSubKeys(String key, int firstKeySeparatorIdx, List<String> subKeys) + static void getAndValidateSubKeys(String key, int firstKeySeparatorIdx, List<String> subKeys) throws IllegalArgumentException { int subKeyBeginIdx = 0; int subKeyEndIdx = firstKeySeparatorIdx; @@ -511,7 +511,16 @@ class ExtraFieldsContainer { if (null == _indexableExtras) { _indexableExtras = new HashMap<>(); } - _indexableExtras.put(key, value); + if (key == null && value instanceof Map) { + // If the key is null, it means that the value is a map that should be merged with the indexable extras + _indexableExtras.putAll((Map<String, Object>) value); + } else if (_indexableExtras.containsKey(key) && _indexableExtras.get(key) instanceof Map && value instanceof Map) { + // If the key already exists in the indexable extras and both the existing value and the new value are maps, + // merge the two maps + ((Map<String, Object>) _indexableExtras.get(key)).putAll((Map<String, Object>) value); + } else { + _indexableExtras.put(key, value); + } } /** @@ -524,7 +533,17 @@ class ExtraFieldsContainer { if (null == _unindexableExtras) { _unindexableExtras = new HashMap<>(); } - _unindexableExtras.put(key, value); + if (key == null && value instanceof Map) { + // If the key is null, it means that the value is a map that should be merged with the unindexable extras + _unindexableExtras.putAll((Map<String, Object>) value); + } else if (_unindexableExtras.containsKey(key) && _unindexableExtras.get(key) instanceof Map + && value instanceof Map) { + // If the key already exists in the uindexable extras and both the existing value and the new value are maps, + // merge the two maps + ((Map<String, Object>) _unindexableExtras.get(key)).putAll((Map<String, Object>) value); + } else { + _unindexableExtras.put(key, value); + } } /** @@ -542,4 +561,8 @@ class ExtraFieldsContainer { addUnindexableEntry(key, childUnindexableFields); } } + + public void addChild(ExtraFieldsContainer child) { + addChild(null, child); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java new file mode 100644 index 0000000000..5471f784bc --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.utils.Base64Utils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.metrics.PinotMeter; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This transformer evolves from {@link SchemaConformingTransformer} and is designed to support extra cases for + * better text searching: + * - Support over-lapping schema fields, in which case it could support schema column "a" and "a.b" at the same time. + * And it only allows primitive type fields to be the value. + * - Extract flattened key-value pairs as mergedTextIndex for better text searching. + * - Add shingle index tokenization functionality for extremely large text fields. + * <p> + * For example, consider this record: + * <pre> + * { + * "a": 1, + * "b": "2", + * "c": { + * "d": 3, + * "e_noindex": 4, + * "f_noindex": { + * "g": 5 + * }, + * "x": { + * "y": 9, + * "z_noindex": 10 + * } + * } + * "h_noindex": "6", + * "i_noindex": { + * "j": 7, + * "k": 8 + * } + * } + * </pre> + * And let's say the table's schema contains these fields: + * <ul> + * <li>a</li> + * <li>c</li> + * <li>c.d</li> + * </ul> + * <p> + * The record would be transformed into the following (refer to {@link SchemaConformingTransformerV2Config} for + * * default constant values): + * <pre> + * { + * "a": 1, + * "c": null, + * "c.d": 3, + * "json_data": { + * "b": "2", + * "c": { + * "x": { + * "y": 9 + * } + * } + * } + * "json_data_no_idx": { + * "c": { + * "e_noindex": 4, + * "f_noindex": { + * "g": 5 + * }, + * "x": { + * "z_noindex": 10 + * } + * }, + * "h_noindex": "6", + * "i_noindex": { + * "j": 7, + * "k": 8 + * } + * }, + * "__mergedTextIndex": [ + * "1:a", "2:b", "3:c.d", "9:c.x.y" + * ] + * } + * </pre> + * <p> + * The "__mergedTextIndex" could filter and manipulate the data based on the configuration in + * {@link SchemaConformingTransformerV2Config}. + */ +public class SchemaConformingTransformerV2 implements RecordTransformer { + private static final Logger _logger = LoggerFactory.getLogger(SchemaConformingTransformerV2.class); + private static final int MAXIMUM_LUCENE_DOCUMENT_SIZE = 32766; + private static final String MIN_DOCUMENT_LENGTH_DESCRIPTION = + "key length + `:` + shingle index overlap length + one non-overlap char"; + + private final boolean _continueOnError; + private final SchemaConformingTransformerV2Config _transformerConfig; + private final DataType _indexableExtrasFieldType; + private final DataType _unindexableExtrasFieldType; + private final DimensionFieldSpec _mergedTextIndexFieldSpec; + @Nullable + ServerMetrics _serverMetrics = null; + private SchemaTreeNode _schemaTree; + @Nullable + private PinotMeter _realtimeMergedTextIndexTruncatedDocumentSizeMeter = null; + private String _tableName; + private long _mergedTextIndexDocumentBytesCount = 0L; + private long _mergedTextIndexDocumentCount = 0L; + + public SchemaConformingTransformerV2(TableConfig tableConfig, Schema schema) { + if (null == tableConfig.getIngestionConfig() || null == tableConfig.getIngestionConfig() + .getSchemaConformingTransformerV2Config()) { + _continueOnError = false; + _transformerConfig = null; + _indexableExtrasFieldType = null; + _unindexableExtrasFieldType = null; + _mergedTextIndexFieldSpec = null; + return; + } + + _continueOnError = tableConfig.getIngestionConfig().isContinueOnError(); + _transformerConfig = tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config(); + String indexableExtrasFieldName = _transformerConfig.getIndexableExtrasField(); + _indexableExtrasFieldType = + indexableExtrasFieldName == null ? null : SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, + indexableExtrasFieldName); + String unindexableExtrasFieldName = _transformerConfig.getUnindexableExtrasField(); + _unindexableExtrasFieldType = + unindexableExtrasFieldName == null ? null : SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, + unindexableExtrasFieldName); + _mergedTextIndexFieldSpec = schema.getDimensionSpec(_transformerConfig.getMergedTextIndexField()); + _tableName = tableConfig.getTableName(); + _schemaTree = validateSchemaAndCreateTree(schema, _transformerConfig); + _serverMetrics = ServerMetrics.get(); + } + + /** + * Validates the schema against the given transformer's configuration. + */ + public static void validateSchema(@Nonnull Schema schema, + @Nonnull SchemaConformingTransformerV2Config transformerConfig) { + validateSchemaFieldNames(schema.getPhysicalColumnNames(), transformerConfig); + + String indexableExtrasFieldName = transformerConfig.getIndexableExtrasField(); + if (null != indexableExtrasFieldName) { + SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + } + String unindexableExtrasFieldName = transformerConfig.getUnindexableExtrasField(); + if (null != unindexableExtrasFieldName) { + SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, indexableExtrasFieldName); + } + + validateSchemaAndCreateTree(schema, transformerConfig); + } + + /** + * Heuristic filter to detect whether a byte array is longer than a specified length and contains only base64 + * characters so that we treat it as encoded binary data. + * @param bytes array to check + * @param minLength byte array shorter than this length will not be treated as encoded binary data + * @return true if the input bytes is base64 encoded binary data by the heuristic above, false otherwise + */ + public static boolean base64ValueFilter(final byte[] bytes, int minLength) { + return bytes.length >= minLength && Base64Utils.isBase64IgnoreTrailingPeriods(bytes); + } + + /** + * Validates that none of the schema fields have names that conflict with the transformer's configuration. + */ + private static void validateSchemaFieldNames(Set<String> schemaFields, + SchemaConformingTransformerV2Config transformerConfig) { + // Validate that none of the columns in the schema end with unindexableFieldSuffix + String unindexableFieldSuffix = transformerConfig.getUnindexableFieldSuffix(); + if (null != unindexableFieldSuffix) { + for (String field : schemaFields) { + Preconditions.checkState(!field.endsWith(unindexableFieldSuffix), "Field '%s' has no-index suffix '%s'", field, + unindexableFieldSuffix); + } + } + + // Validate that none of the columns in the schema end overlap with the fields in fieldPathsToDrop + Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop) { + Set<String> fieldIntersection = new HashSet<>(schemaFields); + fieldIntersection.retainAll(fieldPathsToDrop); + Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema overlap with fieldPathsToDrop"); + } + } + + /** + * Validates the schema with a {@link SchemaConformingTransformerV2Config} instance and creates a tree representing + * the fields in the schema to be used when transforming input records. Refer to {@link SchemaTreeNode} for details. + * @throws IllegalArgumentException if schema validation fails in: + * <ul> + * <li>One of the fields in the schema has a name which when interpreted as a JSON path, corresponds to an object + * with an empty sub-key. E.g., the field name "a..b" corresponds to the JSON {"a": {"": {"b": ...}}}</li> + * </ul> + */ + private static SchemaTreeNode validateSchemaAndCreateTree(@Nonnull Schema schema, + @Nonnull SchemaConformingTransformerV2Config transformerConfig) + throws IllegalArgumentException { + Set<String> schemaFields = schema.getPhysicalColumnNames(); + Map<String, String> jsonKeyPathToColumnNameMap = new HashMap<>(); + for (Map.Entry<String, String> entry : transformerConfig.getColumnNameToJsonKeyPathMap().entrySet()) { + String columnName = entry.getKey(); + String jsonKeyPath = entry.getValue(); + schemaFields.remove(columnName); + schemaFields.add(jsonKeyPath); + jsonKeyPathToColumnNameMap.put(jsonKeyPath, columnName); + } + + SchemaTreeNode rootNode = new SchemaTreeNode("", null, schema); + List<String> subKeys = new ArrayList<>(); + for (String field : schemaFields) { + SchemaTreeNode currentNode = rootNode; + int keySeparatorIdx = field.indexOf(JsonUtils.KEY_SEPARATOR); + if (-1 == keySeparatorIdx) { + // Not a flattened key + currentNode = rootNode.getAndCreateChild(field, schema); + } else { + subKeys.clear(); + SchemaConformingTransformer.getAndValidateSubKeys(field, keySeparatorIdx, subKeys); + for (String subKey : subKeys) { + SchemaTreeNode childNode = currentNode.getAndCreateChild(subKey, schema); + currentNode = childNode; + } + } + currentNode.setColumn(jsonKeyPathToColumnNameMap.get(field)); + } + + return rootNode; + } + + @Override + public boolean isNoOp() { + return null == _transformerConfig; + } + + @Nullable + @Override + public GenericRow transform(GenericRow record) { + GenericRow outputRecord = new GenericRow(); + Map<String, Object> mergedTextIndexMap = new HashMap<>(); + + try { + Deque<String> jsonPath = new ArrayDeque<>(); + ExtraFieldsContainer extraFieldsContainer = + new ExtraFieldsContainer(null != _transformerConfig.getUnindexableExtrasField()); + for (Map.Entry<String, Object> recordEntry : record.getFieldToValueMap().entrySet()) { + String recordKey = recordEntry.getKey(); + Object recordValue = recordEntry.getValue(); + jsonPath.addLast(recordKey); + ExtraFieldsContainer currentFieldsContainer = + processField(_schemaTree, jsonPath, recordValue, true, outputRecord, mergedTextIndexMap); + extraFieldsContainer.addChild(currentFieldsContainer); + jsonPath.removeLast(); + } + putExtrasField(_transformerConfig.getIndexableExtrasField(), _indexableExtrasFieldType, + extraFieldsContainer.getIndexableExtras(), outputRecord); + putExtrasField(_transformerConfig.getUnindexableExtrasField(), _unindexableExtrasFieldType, + extraFieldsContainer.getUnindexableExtras(), outputRecord); + + // Generate merged text index + if (null != _mergedTextIndexFieldSpec && !mergedTextIndexMap.isEmpty()) { + List<String> luceneDocuments = getLuceneDocumentsFromMergedTextIndexMap(mergedTextIndexMap); + if (_mergedTextIndexFieldSpec.isSingleValueField()) { + outputRecord.putValue(_transformerConfig.getMergedTextIndexField(), String.join(" ", luceneDocuments)); + } else { + outputRecord.putValue(_transformerConfig.getMergedTextIndexField(), luceneDocuments); + } + } + } catch (Exception e) { + if (!_continueOnError) { + throw e; + } + _logger.error("Couldn't transform record: {}", record.toString(), e); + outputRecord.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true); + } + + return outputRecord; + } + + /** + * The method traverses the record and schema tree at the same time. It would check the specs of record key/value + * pairs with the corresponding schema tree node and {#link SchemaConformingTransformerV2Config}. Finally drop or put + * them into the output record with the following logics: + * Taking example: + * { + * "a": 1, + * "b": { + * "c": 2, + * "d": 3, + * "d_noIdx": 4 + * } + * "b_noIdx": { + * "c": 5, + * "d": 6, + * } + * } + * with column "a", "b", "b.c" in schema + * There are two types of output: + * - flattened keys with values, e.g., + * - keyPath as column and value as leaf node, e.g., "a": 1, "b.c": 2. However, "b" is not a leaf node, so it would + * be skipped + * - __mergedTestIdx storing ["1:a", "2:b.c", "3:b.d"] as a string array + * - structured Json format, e.g., + * - indexableFields/json_data: {"a": 1, "b": {"c": 2, "d": 3}} + * - unindexableFields/json_data_noIdx: {"b": {"d_noIdx": 4} ,"b_noIdx": {"c": 5, "d": 6}} + * Expected behavior: + * - If the current key is special, it would be added to the outputRecord and skip subtree + * - If the keyJsonPath is in fieldPathsToDrop, it and its subtree would be skipped + * - At leaf node (base case in recursion): + * - Parse keyPath and value and add as flattened result to outputRecord + * - Return structured fields as ExtraFieldsContainer + * (leaf node is defined as node not as "Map" type. Leaf node is possible to be collection of or array of "Map". But + * for simplicity, we still treat it as leaf node and do not traverse its children) + * - For non-leaf node + * - Construct ExtraFieldsContainer based on children's result and return + * + * @param parentNode The parent node in the schema tree which might or might not has a child with the given key. If + * parentNode is null, it means the current key is out of the schema tree. + * @param jsonPath The key json path split by "." + * @param value The value of the current field + * @param isIndexable Whether the current field is indexable + * @param outputRecord The output record updated during traverse + * @param mergedTextIndexMap The merged text index map updated during traverse + * @return ExtraFieldsContainer carries the indexable and unindexable fields of the current node as well as its + * subtree + */ + private ExtraFieldsContainer processField(SchemaTreeNode parentNode, Deque<String> jsonPath, Object value, + boolean isIndexable, GenericRow outputRecord, Map<String, Object> mergedTextIndexMap) { + // Common variables + boolean storeIndexableExtras = _transformerConfig.getIndexableExtrasField() != null; + boolean storeUnindexableExtras = _transformerConfig.getUnindexableExtrasField() != null; + String key = jsonPath.peekLast(); + ExtraFieldsContainer extraFieldsContainer = new ExtraFieldsContainer(storeUnindexableExtras); + + // Base case + if (StreamDataDecoderImpl.isSpecialKeyType(key) || GenericRow.isSpecialKeyType(key)) { + outputRecord.putValue(key, value); + return extraFieldsContainer; + } + + String keyJsonPath = String.join(".", jsonPath); + if (_transformerConfig.getFieldPathsToPreserveInput().contains(keyJsonPath)) { + outputRecord.putValue(keyJsonPath, value); + return extraFieldsContainer; + } + + Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop(); + if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) { + return extraFieldsContainer; + } + + SchemaTreeNode currentNode = parentNode == null ? null : parentNode.getChild(key); + String unindexableFieldSuffix = _transformerConfig.getUnindexableFieldSuffix(); + isIndexable = isIndexable && (null == unindexableFieldSuffix || !key.endsWith(unindexableFieldSuffix)); + if (!(value instanceof Map)) { + // leaf node + if (!isIndexable) { + extraFieldsContainer.addUnindexableEntry(key, value); + } else { + if (null != currentNode && currentNode.isColumn()) { + // In schema + outputRecord.putValue(currentNode.getColumnName(), currentNode.getValue(value)); + if (_transformerConfig.getFieldsToDoubleIngest().contains(keyJsonPath)) { + extraFieldsContainer.addIndexableEntry(key, value); + } + mergedTextIndexMap.put(keyJsonPath, value); + } else { + // Out of schema + if (storeIndexableExtras) { + extraFieldsContainer.addIndexableEntry(key, value); + mergedTextIndexMap.put(keyJsonPath, value); + } + } + } + return extraFieldsContainer; + } + // Traverse the subtree + Map<String, Object> valueAsMap = (Map<String, Object>) value; + for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) { + jsonPath.addLast(entry.getKey()); + ExtraFieldsContainer childContainer = + processField(currentNode, jsonPath, entry.getValue(), isIndexable, outputRecord, mergedTextIndexMap); + extraFieldsContainer.addChild(key, childContainer); + jsonPath.removeLast(); + } + return extraFieldsContainer; + } + + /** + * Generate an Lucene document based on the provided key-value pair. + * The index document follows this format: "val:key". + * @param kv used to generate text index documents + * @param indexDocuments a list to store the generated index documents + * @param mergedTextIndexDocumentMaxLength which we enforce via truncation during document generation + */ + public void generateTextIndexLuceneDocument(Map.Entry<String, Object> kv, List<String> indexDocuments, + Integer mergedTextIndexDocumentMaxLength) { + String key = kv.getKey(); + String val; + // To avoid redundant leading and tailing '"', only convert to JSON string if the value is a list or an array + if (kv.getValue() instanceof Collection || kv.getValue() instanceof Object[]) { + try { + val = JsonUtils.objectToString(kv.getValue()); + } catch (JsonProcessingException e) { + val = kv.getValue().toString(); + } + } else { + val = kv.getValue().toString(); + } + + // TODO: theoretically, the key length + 1 could cause integer overflow. But in reality, upstream message size + // limit usually could not reach that high. We should revisit this if we see any issue. + if (key.length() + 1 > MAXIMUM_LUCENE_DOCUMENT_SIZE) { + _logger.error("The provided key's length is too long, text index document cannot be truncated"); + return; + } + + // Truncate the value to ensure the generated index document is less or equal to mergedTextIndexDocumentMaxLength + // The value length should be the mergedTextIndexDocumentMaxLength minus ":" character (length 1) minus key length + int valueTruncationLength = mergedTextIndexDocumentMaxLength - 1 - key.length(); + if (val.length() > valueTruncationLength) { + _realtimeMergedTextIndexTruncatedDocumentSizeMeter = _serverMetrics + .addMeteredTableValue(_tableName, ServerMeter.REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE, + key.length() + 1 + val.length(), _realtimeMergedTextIndexTruncatedDocumentSizeMeter); + val = val.substring(0, valueTruncationLength); + } + + _mergedTextIndexDocumentBytesCount += key.length() + 1 + val.length(); + _mergedTextIndexDocumentCount += 1; + _serverMetrics.setValueOfTableGauge(_tableName, ServerGauge.REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN, + _mergedTextIndexDocumentBytesCount / _mergedTextIndexDocumentCount); + + indexDocuments.add(val + ":" + key); + } + + /** + * Implement shingling for the merged text index based on the provided key-value pair. + * Each shingled index document retains the format of a standard index document: "val:key". However, "val" now + * denotes a sliding window of characters on the value. The total length of each shingled index document + * (key length + shingled value length + 1)must be less than or equal to shingleIndexMaxLength. The starting index + * of the sliding window for the value is increased by shinglingOverlapLength for every new shingled document. + * All shingle index documents, except for the last one, should have the maximum possible length. If the minimum + * document length (shingling overlap length + key length + 1) exceeds the maximum Lucene document size + * (MAXIMUM_LUCENE_DOCUMENT_SIZE), shingling is disabled, and the value is truncated to match the maximum Lucene + * document size. If shingleIndexMaxLength is lower than the required minimum document length and also lower than + * the maximum + * Lucene document size, shingleIndexMaxLength is adjusted to match the maximum Lucene document size. + * + * Note that the most important parameter, the shingleIndexOverlapLength, is the maximum search length that will yield + * results with 100% accuracy. + * + * Example: key-> "key", value-> "0123456789ABCDEF", max length: 10, shingling overlap length: 3 + * Generated documents: + * 012345:key + * 345678:key + * 6789AB:key + * 9ABCDE:key + * CDEF:key + * Any query with a length of 7 will yield no results, such as "0123456" or "6789ABC". + * Any query with a length of 3 will yield results with 100% accuracy (i.e. is always guaranteed to be searchable). + * Any query with a length between 4 and 6 (inclusive) has indeterminate accuracy. + * E.g. for queries with length 5, "12345", "789AB" will hit, while "23456" will miss. + * + * @param kv used to generate shingle text index documents + * @param shingleIndexDocuments a list to store the generated shingle index documents + * @param shingleIndexMaxLength the maximum length of each shingle index document. Needs to be greater than the + * length of the key and shingleIndexOverlapLength + 1, and must be lower or equal + * to MAXIMUM_LUCENE_DOCUMENT_SIZE. + * @param shingleIndexOverlapLength the number of characters in the kv-pair's value shared by two adjacent shingle + * index documents. If null, the overlap length will be defaulted to half of the max + * document length. + */ + public void generateShingleTextIndexDocument(Map.Entry<String, Object> kv, List<String> shingleIndexDocuments, + int shingleIndexMaxLength, int shingleIndexOverlapLength) { + String key = kv.getKey(); + String val; + // To avoid redundant leading and tailing '"', only convert to JSON string if the value is a list or an array + if (kv.getValue() instanceof Collection || kv.getValue() instanceof Object[]) { + try { + val = JsonUtils.objectToString(kv.getValue()); + } catch (JsonProcessingException e) { + val = kv.getValue().toString(); + } + } else { + val = kv.getValue().toString(); + } + final int valLength = val.length(); + final int documentSuffixLength = key.length() + 1; + final int minDocumentLength = documentSuffixLength + shingleIndexOverlapLength + 1; + + if (shingleIndexOverlapLength >= valLength) { + if (_logger.isDebugEnabled()) { + _logger.warn("The shingleIndexOverlapLength " + shingleIndexOverlapLength + " is longer than the value length " + + valLength + ". Shingling will not be applied since only one document will be generated."); + } + generateTextIndexLuceneDocument(kv, shingleIndexDocuments, shingleIndexMaxLength); + return; + } + + if (minDocumentLength > MAXIMUM_LUCENE_DOCUMENT_SIZE) { + _logger.debug("The minimum document length " + minDocumentLength + " (" + MIN_DOCUMENT_LENGTH_DESCRIPTION + ") " + + " exceeds the limit of maximum Lucene document size " + MAXIMUM_LUCENE_DOCUMENT_SIZE + ". Value will be " + + "truncated and shingling will not be applied."); + generateTextIndexLuceneDocument(kv, shingleIndexDocuments, shingleIndexMaxLength); + return; + } + + // This logging becomes expensive if user accidentally sets a very low shingleIndexMaxLength + if (shingleIndexMaxLength < minDocumentLength) { + _logger.debug("The shingleIndexMaxLength " + shingleIndexMaxLength + " is smaller than the minimum document " + + "length " + minDocumentLength + " (" + MIN_DOCUMENT_LENGTH_DESCRIPTION + "). Increasing the " + + "shingleIndexMaxLength to maximum Lucene document size " + MAXIMUM_LUCENE_DOCUMENT_SIZE + "."); + shingleIndexMaxLength = MAXIMUM_LUCENE_DOCUMENT_SIZE; + } + + // Shingle window slide length is the index position on the value which we shall advance on every iteration. + // We ensure shingleIndexMaxLength >= minDocumentLength so that shingleWindowSlideLength >= 1. + int shingleWindowSlideLength = shingleIndexMaxLength - shingleIndexOverlapLength - documentSuffixLength; + + // Generate shingle index documents + // When starting_idx + shingleIndexOverlapLength >= valLength, there are no new characters to capture, then we stop + // the shingle document generation loop. + // We ensure that shingleIndexOverlapLength < valLength so that this loop will be entered at lease once. + for (int i = 0; i + shingleIndexOverlapLength < valLength; i += shingleWindowSlideLength) { + String documentValStr = val.substring(i, Math.min(i + shingleIndexMaxLength - documentSuffixLength, valLength)); + String shingleIndexDocument = documentValStr + ":" + key; + shingleIndexDocuments.add(shingleIndexDocument); + _mergedTextIndexDocumentBytesCount += shingleIndexDocument.length(); + ++_mergedTextIndexDocumentCount; + } + _serverMetrics.setValueOfTableGauge(_tableName, ServerGauge.REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN, + _mergedTextIndexDocumentBytesCount / _mergedTextIndexDocumentCount); + } + + /** + * Converts (if necessary) and adds the given extras field to the output record + */ + private void putExtrasField(String fieldName, DataType fieldType, Map<String, Object> field, + GenericRow outputRecord) { + if (null == field) { + return; + } + + switch (fieldType) { + case JSON: + outputRecord.putValue(fieldName, field); + break; + case STRING: + try { + outputRecord.putValue(fieldName, JsonUtils.objectToString(field)); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to convert '" + fieldName + "' to string", e); + } + break; + default: + throw new UnsupportedOperationException("Cannot convert '" + fieldName + "' to " + fieldType.name()); + } + } + + private List<String> getLuceneDocumentsFromMergedTextIndexMap(Map<String, Object> mergedTextIndexMap) { + final Integer mergedTextIndexDocumentMaxLength = _transformerConfig.getMergedTextIndexDocumentMaxLength(); + final @Nullable + Integer mergedTextIndexShinglingOverlapLength = _transformerConfig.getMergedTextIndexShinglingOverlapLength(); + List<String> luceneDocuments = new ArrayList<>(); + mergedTextIndexMap.entrySet().stream().filter(kv -> null != kv.getKey() && null != kv.getValue()) + .filter(kv -> !_transformerConfig.getMergedTextIndexPathToExclude().contains(kv.getKey())).filter( + kv -> !base64ValueFilter(kv.getValue().toString().getBytes(), + _transformerConfig.getMergedTextIndexBinaryDocumentDetectionMinLength())).filter( + kv -> _transformerConfig.getMergedTextIndexSuffixToExclude().stream() + .anyMatch(suffix -> !kv.getKey().endsWith(suffix))).forEach(kv -> { + if (null == mergedTextIndexShinglingOverlapLength) { + generateTextIndexLuceneDocument(kv, luceneDocuments, mergedTextIndexDocumentMaxLength); + } else { + generateShingleTextIndexDocument(kv, luceneDocuments, mergedTextIndexDocumentMaxLength, + mergedTextIndexShinglingOverlapLength); + } + }); + return luceneDocuments; + } +} + +/** + * SchemaTreeNode represents the tree node when we construct the schema tree. The node could be either leaf node or + * non-leaf node. Both types of node could hold the volumn as a column in the schema. + * For example, the schema with columns a, b, c, d.e, d.f, x.y, x.y.z, x.y.w will have the following tree structure: + * root -- a* + * -- b* + * -- c* + * -- d -- e* + * -- f* + * -- x* -- y* -- z* + * -- w* + * where node with "*" could represent a valid column in the schema. + */ +class SchemaTreeNode { + private boolean _isColumn; + private Map<String, SchemaTreeNode> _children; + // Taking the example of key "x.y.z", the keyName will be "z" and the parentPath will be "x.y" + // Root node would have keyName as "" and parentPath as null + // Root node's children will have keyName as the first level key and parentPath as "" + @Nonnull + private String _keyName; + @Nullable + private String _columnName; + @Nullable + private String _parentPath; + private FieldSpec _fieldSpec; + + public SchemaTreeNode(String keyName, String parentPath, Schema schema) { + _keyName = keyName; + _parentPath = parentPath; + _fieldSpec = schema.getFieldSpecFor(getJsonKeyPath()); + _children = new HashMap<>(); + } + + public boolean isColumn() { + return _isColumn; + } + + public void setColumn(String columnName) { + if (columnName == null) { + _columnName = getJsonKeyPath(); + } else { + _columnName = columnName; + } + _isColumn = true; + } + + public boolean hasChild(String key) { + return _children.containsKey(key); + } + + /** + * If does not have the child node, add a child node to the current node and return the child node. + * If the child node already exists, return the existing child node. + * @param key + * @return + */ + public SchemaTreeNode getAndCreateChild(String key, Schema schema) { + SchemaTreeNode child = _children.get(key); + if (child == null) { + child = new SchemaTreeNode(key, getJsonKeyPath(), schema); + _children.put(key, child); + } + return child; + } + + public SchemaTreeNode getChild(String key) { + return _children.get(key); + } + + public String getKeyName() { + return _keyName; + } + + public String getColumnName() { + return _columnName; + } + + public Object getValue(Object value) { + // In {#link DataTypeTransformer}, for a field type as SingleValueField, it does not allow the input value as a + // collection or array. To prevent the error, we serialize the value to a string if the field is a string type. + if (_fieldSpec != null && _fieldSpec.getDataType() == DataType.STRING && _fieldSpec.isSingleValueField()) { + try { + if (value instanceof Collection) { + return JsonUtils.objectToString(value); + } + if (value instanceof Object[]) { + return JsonUtils.objectToString(Arrays.asList((Object[]) value)); + } + } catch (JsonProcessingException e) { + return value.toString(); + } + } + return value; + } + + public String getJsonKeyPath() { + if (_parentPath == null || _parentPath.isEmpty()) { + return _keyName; + } + return _parentPath + JsonUtils.KEY_SEPARATOR + _keyName; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/Base64Utils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/Base64Utils.java new file mode 100644 index 0000000000..12a1652a24 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/Base64Utils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import org.apache.commons.codec.binary.Base64; + +/** + * Simple wrapper class over codec's Base64 implementation to handle Pinot-specific Base64 encoded binary data. + */ +public class Base64Utils extends Base64 { + public static boolean isBase64IgnoreWhiteSpace(byte[] arrayOctet) { + return isBase64(arrayOctet); + } + + public static boolean isBase64IgnoreTrailingPeriods(byte[] arrayOctet) { + int i = arrayOctet.length - 1; + while (i >= 0 && '.' == arrayOctet[i]) { + --i; + } + while (i >= 0) { + if (!isBase64(arrayOctet[i])) { + return false; + } + --i; + } + return true; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java index 203d7d930a..556b025855 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java @@ -310,7 +310,8 @@ public final class IngestionUtils { public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) { Set<String> fieldsForRecordExtractor = new HashSet<>(); - if (null != ingestionConfig && null != ingestionConfig.getSchemaConformingTransformerConfig()) { + if (null != ingestionConfig && (null != ingestionConfig.getSchemaConformingTransformerConfig() + || null != ingestionConfig.getSchemaConformingTransformerV2Config())) { // The SchemaConformingTransformer requires that all fields are extracted, indicated by returning an empty set // here. Compared to extracting the fields specified below, extracting all fields should be a superset. return fieldsForRecordExtractor; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 8d31f5d399..14c4040a60 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -46,6 +46,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; +import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformerV2; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; @@ -77,6 +78,7 @@ import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.FieldSpec; @@ -574,6 +576,12 @@ public final class TableConfigUtils { if (null != schemaConformingTransformerConfig && null != schema) { SchemaConformingTransformer.validateSchema(schema, schemaConformingTransformerConfig); } + + SchemaConformingTransformerV2Config schemaConformingTransformerV2Config = + ingestionConfig.getSchemaConformingTransformerV2Config(); + if (null != schemaConformingTransformerV2Config && null != schema) { + SchemaConformingTransformerV2.validateSchema(schema, schemaConformingTransformerV2Config); + } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java new file mode 100644 index 0000000000..6189f14d42 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java @@ -0,0 +1,934 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.NumericNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.fail; + + +public class SchemaConformingTransformerV2Test { + private static final String INDEXABLE_EXTRAS_FIELD_NAME = "json_data"; + private static final String UNINDEXABLE_EXTRAS_FIELD_NAME = "json_data_no_idx"; + private static final String UNINDEXABLE_FIELD_SUFFIX = "_noIndex"; + private static final String MERGED_TEXT_INDEX_FIELD_NAME = "__mergedTextIndex"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final JsonNodeFactory N = OBJECT_MAPPER.getNodeFactory(); + private static final String TEST_JSON_ARRAY_FIELD_NAME = "arrayField"; + private static final String TEST_JSON_NULL_FIELD_NAME = "nullField"; + private static final String TEST_JSON_STRING_FIELD_NAME = "stringField"; + private static final String TEST_JSON_MAP_FIELD_NAME = "mapField"; + private static final String TEST_JSON_MAP_NO_IDX_FIELD_NAME = "mapField_noIndex"; + private static final String TEST_JSON_NESTED_MAP_FIELD_NAME = "nestedFields"; + private static final String TEST_JSON_INT_NO_IDX_FIELD_NAME = "intField_noIndex"; + private static final String TEST_JSON_STRING_NO_IDX_FIELD_NAME = "stringField_noIndex"; + private static final ArrayNode TEST_JSON_ARRAY_NODE = N.arrayNode().add(0).add(1).add(2).add(3); + private static final NullNode TEST_JSON_NULL_NODE = N.nullNode(); + private static final TextNode TEST_JSON_STRING_NODE = N.textNode("a"); + private static final NumericNode TEST_INT_NODE = N.numberNode(9); + private static final TextNode TEST_JSON_STRING_NO_IDX_NODE = N.textNode("z"); + private static final CustomObjectNode TEST_JSON_MAP_NODE = + CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE); + private static final CustomObjectNode TEST_JSON_MAP_NO_IDX_NODE = + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE); + private static final CustomObjectNode TEST_JSON_MAP_NODE_WITH_NO_IDX = + CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE); + static { + ServerMetrics.register(mock(ServerMetrics.class)); + } + private static final SchemaConformingTransformerV2 _RECORD_TRANSFORMER = + new SchemaConformingTransformerV2(createDefaultBasicTableConfig(), createDefaultSchema()); + + private static TableConfig createDefaultBasicTableConfig() { + IngestionConfig ingestionConfig = new IngestionConfig(); + SchemaConformingTransformerV2Config schemaConformingTransformerV2Config = + new SchemaConformingTransformerV2Config(true, INDEXABLE_EXTRAS_FIELD_NAME, true, UNINDEXABLE_EXTRAS_FIELD_NAME, + UNINDEXABLE_FIELD_SUFFIX, null, null, null, null, null, null, null, null); + ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config); + return new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig) + .build(); + } + + private static TableConfig createDefaultTableConfig(String indexableExtrasField, String unindexableExtrasField, + String unindexableFieldSuffix, Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, + String mergedTextIndexField) { + IngestionConfig ingestionConfig = new IngestionConfig(); + SchemaConformingTransformerV2Config schemaConformingTransformerV2Config = + new SchemaConformingTransformerV2Config(indexableExtrasField != null, indexableExtrasField, + unindexableExtrasField != null, unindexableExtrasField, unindexableFieldSuffix, fieldPathsToDrop, + fieldPathsToPreserve, mergedTextIndexField, null, null, null, null, null); + ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config); + return new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig) + .build(); + } + + private static Schema createDefaultSchema() { + return createDefaultSchemaBuilder().addSingleValueDimension("intField", DataType.INT).build(); + } + + private static Schema.SchemaBuilder createDefaultSchemaBuilder() { + return new Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON) + .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON); + } + + @Test + public void testWithNoUnindexableFields() { + /* + { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a", + "mapField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a" + }, + "nestedField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a", + "mapField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a" + } + } + } + */ + final CustomObjectNode inputJsonNode = + CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)); + + CustomObjectNode expectedJsonNode; + Schema schema; + + // No dedicated columns, everything moved under INDEXABLE_EXTRAS_FIELD_NAME + /* + { + "json_data" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a", + "mapField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a" + }, + "nestedField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a", + "mapField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a" + } + } + } + } + */ + schema = createDefaultSchemaBuilder().build(); + expectedJsonNode = CustomObjectNode.create().set(INDEXABLE_EXTRAS_FIELD_NAME, inputJsonNode); + transformWithIndexableFields(schema, inputJsonNode, expectedJsonNode); + + // Three dedicated columns in schema, only two are populated, one ignored + /* + { + "arrayField":[0, 1, 2, 3], + "nestedFields.stringField":"a", + "<indexableExtras>":{ + "mapField": { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nullField":null, + "stringField":"a", + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + } + } + */ + schema = createDefaultSchemaBuilder().addMultiValueDimension(TEST_JSON_ARRAY_FIELD_NAME, DataType.INT) + .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, DataType.STRING) + .build(); + expectedJsonNode = CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + + .set(INDEXABLE_EXTRAS_FIELD_NAME, CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE) + .setAll(TEST_JSON_MAP_NODE.deepCopy().removeAndReturn(TEST_JSON_ARRAY_FIELD_NAME)) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME, CustomObjectNode.create() + .setAll(TEST_JSON_MAP_NODE.deepCopy().removeAndReturn(TEST_JSON_STRING_FIELD_NAME)) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE))); + transformWithIndexableFields(schema, inputJsonNode, expectedJsonNode); + + // 8 dedicated columns, only 6 are populated + /* + { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a", + "nestedField.arrayField" : [ 0, 1, 2, 3 ], + "nestedField.nullField" : null, + "nestedField.stringField" : "a", + "json_data" : { + "mapField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a" + }, + "nestedField" : { + "mapField" : { + "arrayField" : [ 0, 1, 2, 3 ], + "nullField" : null, + "stringField" : "a" + } + } + } + } + */ + schema = createDefaultSchemaBuilder().addMultiValueDimension(TEST_JSON_ARRAY_FIELD_NAME, DataType.INT) + .addSingleValueDimension(TEST_JSON_NULL_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_STRING_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.JSON) + .addMultiValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME, DataType.INT) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_NULL_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_MAP_FIELD_NAME, DataType.JSON) + .build(); + expectedJsonNode = CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(INDEXABLE_EXTRAS_FIELD_NAME, CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE))); + transformWithIndexableFields(schema, inputJsonNode, expectedJsonNode); + } + + @Test + public void testWithUnindexableFieldsAndMergedTextIndex() { + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + }, + "mapField_noIndex":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + } + } + } + */ + final CustomObjectNode inputJsonNode = + CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX) + .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)); + + CustomObjectNode expectedJsonNode; + CustomObjectNode expectedJsonNodeWithMergedTextIndex; + Schema.SchemaBuilder schemaBuilder; + + // No schema + schemaBuilder = createDefaultSchemaBuilder(); + /* + { + "indexableExtras":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + }, + "mapField_noIndex":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + } + }, + __mergedTextIndex: [ + "[0, 1, 2, 3]:arrayField", "a:stringField", + "[0, 1, 2, 3]:mapField.arrayField", "a:mapField.stringField", + "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField", + "[0, 1, 2, 3]:nestedFields.mapField.arrayField", "a:nestedFields.mapField.stringField", + ] + } + */ + expectedJsonNode = CustomObjectNode.create().set(INDEXABLE_EXTRAS_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE))) + + .set(UNINDEXABLE_EXTRAS_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE) + .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE))); + transformWithUnIndexableFieldsAndMergedTextIndex(schemaBuilder.build(), inputJsonNode, expectedJsonNode); + + expectedJsonNodeWithMergedTextIndex = expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME, + N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:mapField.arrayField") + .add("a:mapField.stringField").add("[0,1,2,3]:nestedFields.arrayField").add("a:nestedFields.stringField") + .add("[0,1,2,3]:nestedFields.mapField.arrayField").add("a:nestedFields.mapField.stringField")); + transformWithUnIndexableFieldsAndMergedTextIndex( + schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, DataType.STRING).build(), inputJsonNode, + expectedJsonNodeWithMergedTextIndex); + + // With schema, mapField is not indexed + schemaBuilder = createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT) + .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME, DataType.JSON) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, DataType.STRING); + /* + { + "arrayField":[0, 1, 2, 3], + "nestedFields.stringField":"a", + "indexableExtras":{ + "nullField":null, + "stringField":"a", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + }, + "mapField_noIndex":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + } + }, + __mergedTextIndex: [ + "[0, 1, 2, 3]:arrayField", "a:stringField", + "[0, 1, 2, 3]:mapField.arrayField", "a:mapField.stringField", + "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField", + "[0, 1, 2, 3]:nestedFields.mapField.arrayField", "a:nestedFields.mapField.stringField", + ] + } + */ + expectedJsonNode = CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(INDEXABLE_EXTRAS_FIELD_NAME, CustomObjectNode.create().set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE).set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE))) + + .set(UNINDEXABLE_EXTRAS_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE) + .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE))); + transformWithUnIndexableFieldsAndMergedTextIndex(schemaBuilder.build(), inputJsonNode, expectedJsonNode); + + expectedJsonNodeWithMergedTextIndex = expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME, + N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:mapField.arrayField") + .add("a:mapField.stringField").add("[0,1,2,3]:nestedFields.arrayField").add("a:nestedFields.stringField") + .add("[0,1,2,3]:nestedFields.mapField.arrayField").add("a:nestedFields.mapField.stringField")); + transformWithUnIndexableFieldsAndMergedTextIndex( + schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, DataType.STRING).build(), inputJsonNode, + expectedJsonNodeWithMergedTextIndex); + + // With all fields in schema, but map field would not be indexed + schemaBuilder = createDefaultSchemaBuilder().addMultiValueDimension(TEST_JSON_ARRAY_FIELD_NAME, DataType.INT) + .addSingleValueDimension(TEST_JSON_NULL_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_STRING_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.JSON) + .addMultiValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME, DataType.INT) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_NULL_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_MAP_FIELD_NAME, DataType.JSON); + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "nestedFields.arrayField":[0, 1, 2, 3], + "nestedFields.nullField":null, + "nestedFields.stringField":"a", + "indexableExtras":{ + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + }, + "nestedFields":{ + mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a" + } + } + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + }, + "mapField_noIndex":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + } + }, + __mergedTextIndex: [ + "[0, 1, 2, 3]:arrayField", "a:stringField", + "[0, 1, 2, 3]:mapField.arrayField", "a:mapField.stringField", + "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField", + "[0, 1, 2, 3]:nestedFields.mapField.arrayField", "a:nestedFields.mapField.stringField", + ] + } + */ + expectedJsonNode = CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + + .set(INDEXABLE_EXTRAS_FIELD_NAME, CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE))) + + .set(UNINDEXABLE_EXTRAS_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE) + .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE))); + transformWithUnIndexableFieldsAndMergedTextIndex(schemaBuilder.build(), inputJsonNode, expectedJsonNode); + expectedJsonNodeWithMergedTextIndex = expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME, + N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:mapField.arrayField") + .add("a:mapField.stringField").add("[0,1,2,3]:nestedFields.arrayField").add("a:nestedFields.stringField") + .add("[0,1,2,3]:nestedFields.mapField.arrayField").add("a:nestedFields.mapField.stringField")); + transformWithUnIndexableFieldsAndMergedTextIndex( + schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, DataType.STRING).build(), inputJsonNode, + expectedJsonNodeWithMergedTextIndex); + } + + @Test + public void testKeyValueTransformation() { + /* + { + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + }, + "mapField_noIndex":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + }, + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z", + "mapField":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + } + } + } + */ + final CustomObjectNode inputJsonNode = + CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX) + .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)); + + CustomObjectNode expectedJsonNode; + CustomObjectNode expectedJsonNodeWithMergedTextIndex; + Schema.SchemaBuilder schemaBuilder; + + String destColumnName = "someMeaningfulName"; + // make array field as single value STRING, test the conversion function + // ignore the column nestedFields + // preserve the entire mapField value + // map the column someMeaningfulName to nestedFields.stringField + schemaBuilder = createDefaultSchemaBuilder().addSingleValueDimension("arrayField", DataType.STRING) + .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING) + .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME, DataType.JSON) + .addSingleValueDimension(destColumnName, DataType.STRING); + + Map<String, String> keyMapping = new HashMap<>() { + { + put(destColumnName, TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_STRING_FIELD_NAME); + } + }; + Set<String> pathToDrop = new HashSet<>() { + { + add(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_MAP_FIELD_NAME); + } + }; + Set<String> pathToPreserve = new HashSet<>() { + { + add(TEST_JSON_MAP_FIELD_NAME); + } + }; + + /* + { + "arrayField":[0,1,2,3], + "nestedFields.stringField":"a", + "mapField":{ + "arrayField":[0,1,2,3], + "nullField":null, + "stringField":"a", + "intField_noIndex":9, + "string_noIndex":"z" + } + "indexableExtras":{ + "nullField":null, + "stringField":"a", + "nestedFields":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + } + }, + "unindexableExtras":{ + "intField_noIndex":9, + "string_noIndex":"z", + "mapField_noIndex":{ + "arrayField":[0, 1, 2, 3], + "nullField":null, + "stringField":"a", + }, + "nestedFields":{ + "intField_noIndex":9, + "string_noIndex":"z" + } + }, + __mergedTextIndex: [ + "[0, 1, 2, 3]:arrayField", "a:stringField", + "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField", + ] + } + */ + expectedJsonNode = CustomObjectNode.create() + .set(TEST_JSON_ARRAY_FIELD_NAME, N.textNode("[0,1,2,3]")) + .set(destColumnName, TEST_JSON_STRING_NODE) + .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX) + .set(INDEXABLE_EXTRAS_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE) + .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE) + .set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE) + .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE))) + + .set(UNINDEXABLE_EXTRAS_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE) + .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME, + CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE) + .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, TEST_JSON_STRING_NO_IDX_NODE))); + + expectedJsonNodeWithMergedTextIndex = expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME, + N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:nestedFields.arrayField").add( + "a:nestedFields.stringField")); + transformKeyValueTransformation( + schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, DataType.STRING).build(), keyMapping, + pathToDrop, pathToPreserve, inputJsonNode, expectedJsonNodeWithMergedTextIndex); + } + + private void transformWithIndexableFields(Schema schema, JsonNode inputRecordJsonNode, JsonNode ouputRecordJsonNode) { + testTransform(INDEXABLE_EXTRAS_FIELD_NAME, null, null, schema, null, null, null, inputRecordJsonNode.toString(), + ouputRecordJsonNode.toString()); + } + + private void transformWithUnIndexableFieldsAndMergedTextIndex(Schema schema, JsonNode inputRecordJsonNode, + JsonNode ouputRecordJsonNode) { + testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, MERGED_TEXT_INDEX_FIELD_NAME, schema, + null, null, null, inputRecordJsonNode.toString(), ouputRecordJsonNode.toString()); + } + + private void transformKeyValueTransformation(Schema schema, Map<String, String> keyMapping, + Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, JsonNode inputRecordJsonNode, + JsonNode ouputRecordJsonNode) { + testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, MERGED_TEXT_INDEX_FIELD_NAME, schema, + keyMapping, fieldPathsToDrop, fieldPathsToPreserve, inputRecordJsonNode.toString(), + ouputRecordJsonNode.toString()); + } + + private void testTransform(String indexableExtrasField, String unindexableExtrasField, String mergedTextIndexField, + Schema schema, Map<String, String> keyMapping, Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, + String inputRecordJSONString, + String expectedOutputRecordJSONString) { + TableConfig tableConfig = + createDefaultTableConfig(indexableExtrasField, unindexableExtrasField, UNINDEXABLE_FIELD_SUFFIX, + fieldPathsToDrop, fieldPathsToPreserve, mergedTextIndexField); + tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config().setColumnNameToJsonKeyPathMap(keyMapping); + GenericRow outputRecord = transformRow(tableConfig, schema, inputRecordJSONString); + Map<String, Object> expectedOutputRecordMap = jsonStringToMap(expectedOutputRecordJSONString); + + // Merged text index field does not need to have deterministic order + Object mergedTextIndexValue = outputRecord.getFieldToValueMap().get(MERGED_TEXT_INDEX_FIELD_NAME); + Object expectedMergedTextIndexValue = expectedOutputRecordMap.get(MERGED_TEXT_INDEX_FIELD_NAME); + if (mergedTextIndexValue != null) { + ((List<Object>) mergedTextIndexValue).sort(null); + } + if (expectedMergedTextIndexValue != null) { + ((List<Object>) expectedMergedTextIndexValue).sort(null); + } + + Assert.assertNotNull(outputRecord); + Assert.assertEquals(outputRecord.getFieldToValueMap(), expectedOutputRecordMap); + } + + /** + * Transforms the given row (given as a JSON string) using the transformer + * @return The transformed row + */ + private GenericRow transformRow(TableConfig tableConfig, Schema schema, String inputRecordJSONString) { + Map<String, Object> inputRecordMap = jsonStringToMap(inputRecordJSONString); + GenericRow inputRecord = createRowFromMap(inputRecordMap); + SchemaConformingTransformerV2 schemaConformingTransformerV2 = + new SchemaConformingTransformerV2(tableConfig, schema); + return schemaConformingTransformerV2.transform(inputRecord); + } + + /** + * @return A map representing the given JSON string + */ + @Nonnull + private Map<String, Object> jsonStringToMap(String jsonString) { + try { + TypeReference<Map<String, Object>> typeRef = new TypeReference<>() { + }; + return OBJECT_MAPPER.readValue(jsonString, typeRef); + } catch (IOException e) { + fail(e.getMessage()); + } + // Should never reach here + return null; + } + + /** + * @return A new generic row with all the kv-pairs from the given map + */ + private GenericRow createRowFromMap(Map<String, Object> map) { + GenericRow record = new GenericRow(); + for (Map.Entry<String, Object> entry : map.entrySet()) { + record.putValue(entry.getKey(), entry.getValue()); + } + return record; + } + + @Test + public void testOverlappingSchemaFields() { + try { + Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING) + .addSingleValueDimension("a.b.c", DataType.INT).build(); + SchemaConformingTransformerV2.validateSchema(schema, + new SchemaConformingTransformerV2Config(null, INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null, + null, null, null, null, null)); + } catch (Exception ex) { + fail("Should not have thrown any exception when overlapping schema occurs"); + } + + try { + // This is a repeat of the previous test but with fields reversed just in case they are processed in order + Schema schema = createDefaultSchemaBuilder().addSingleValueDimension("a.b.c", DataType.INT) + .addSingleValueDimension("a.b", DataType.STRING).build(); + SchemaConformingTransformerV2.validateSchema(schema, + new SchemaConformingTransformerV2Config(null, INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null, + null, null, null, null, null)); + } catch (Exception ex) { + fail("Should not have thrown any exception when overlapping schema occurs"); + } + } + + @Test + public void testBase64ValueFilter() { + String text = "Hello world"; + String binaryData = "ABCxyz12345-_+/="; + String binaryDataWithTrailingPeriods = "ABCxyz12345-_+/=.."; + String binaryDataWithRandomPeriods = "A.BCxy.z12345-_+/=.."; + String shortBinaryData = "short"; + int minLength = 10; + + assertFalse(_RECORD_TRANSFORMER.base64ValueFilter(text.getBytes(), minLength)); + assertTrue(_RECORD_TRANSFORMER.base64ValueFilter(binaryData.getBytes(), minLength)); + assertTrue(_RECORD_TRANSFORMER.base64ValueFilter(binaryDataWithTrailingPeriods.getBytes(), minLength)); + assertFalse(_RECORD_TRANSFORMER.base64ValueFilter(binaryDataWithRandomPeriods.getBytes(), minLength)); + assertFalse(_RECORD_TRANSFORMER.base64ValueFilter(shortBinaryData.getBytes(), minLength)); + } + + @Test + public void testShingleIndexTokenization() { + String key = "key"; + String value = "0123456789ABCDEFGHIJ"; + int shingleIndexMaxLength; + int shingleIndexOverlapLength; + List<String> expectedTokenValues; + + shingleIndexMaxLength = 8; + shingleIndexOverlapLength = 1; + expectedTokenValues = new ArrayList<>( + Arrays.asList("0123:key", "3456:key", "6789:key", "9ABC:key", "CDEF:key", "FGHI:key", "IJ:key")); + testShingleIndexWithParams(key, value, shingleIndexMaxLength, shingleIndexOverlapLength, expectedTokenValues); + + shingleIndexMaxLength = 8; + shingleIndexOverlapLength = 2; + expectedTokenValues = new ArrayList<>(Arrays + .asList("0123:key", "2345:key", "4567:key", "6789:key", "89AB:key", "ABCD:key", "CDEF:key", "EFGH:key", + "GHIJ:key")); + testShingleIndexWithParams(key, value, shingleIndexMaxLength, shingleIndexOverlapLength, expectedTokenValues); + + // If shingleIndexMaxLength is lower than the minimum required length for merged text index token + // (length of the key + shingling overlap length + 1), then the shingleIndexMaxLength is adjusted to + // the maximum Lucene token size (32766) + shingleIndexMaxLength = 1; + shingleIndexOverlapLength = 5; + expectedTokenValues = new ArrayList<>(Arrays.asList(value + ":" + key)); + testShingleIndexWithParams(key, value, shingleIndexMaxLength, shingleIndexOverlapLength, expectedTokenValues); + + // If shingleIndexOverlapLength is equal to or longer than the length of the value, shingling cannot be applied and + // only one token is generated. + shingleIndexMaxLength = 32766; + shingleIndexOverlapLength = 100; + expectedTokenValues = new ArrayList<>(Arrays.asList(value + ":" + key)); + testShingleIndexWithParams(key, value, shingleIndexMaxLength, shingleIndexOverlapLength, expectedTokenValues); + + // Other corner cases, where the result would be the same as if shingling has not been applied + shingleIndexMaxLength = 300; + shingleIndexOverlapLength = 10; + expectedTokenValues = new ArrayList<>(Arrays.asList(value + ":" + key)); + testShingleIndexWithParams(key, value, shingleIndexMaxLength, shingleIndexOverlapLength, expectedTokenValues); + } + + private void testShingleIndexWithParams(String key, String value, Integer shingleIndexMaxLength, + Integer shingleIndexOverlapLength, List<String> expectedTokenValues) { + Map.Entry<String, Object> kv = new AbstractMap.SimpleEntry<>(key, value); + List<String> shingleIndexTokens = new ArrayList<>(); + _RECORD_TRANSFORMER + .generateShingleTextIndexDocument(kv, shingleIndexTokens, shingleIndexMaxLength, shingleIndexOverlapLength); + int numTokens = shingleIndexTokens.size(); + assertEquals(numTokens, expectedTokenValues.size()); + for (int i = 0; i < numTokens; i++) { + assertEquals(shingleIndexTokens.get(i), expectedTokenValues.get(i)); + } + } + + static class CustomObjectNode extends ObjectNode { + public CustomObjectNode() { + super(OBJECT_MAPPER.getNodeFactory()); + } + + public static CustomObjectNode create() { + return new CustomObjectNode(); + } + + public CustomObjectNode set(String fieldName, JsonNode value) { + super.set(fieldName, value); + return this; + } + + public CustomObjectNode setAll(ObjectNode other) { + super.setAll(other); + return this; + } + + public CustomObjectNode removeAndReturn(String fieldName) { + super.remove(fieldName); + return this; + } + + public CustomObjectNode deepCopy() { + return CustomObjectNode.create().setAll(this); + } + } + + static { + ServerMetrics.register(mock(ServerMetrics.class)); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/Base64UtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/Base64UtilsTest.java new file mode 100644 index 0000000000..e067321404 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/Base64UtilsTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import java.nio.charset.StandardCharsets; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class Base64UtilsTest { + private static final String SPECIAL_CHARS = "+/=-_"; + private static final String UPPER_CASE_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + private static final String LOWER_CASE_CHARS = "abcdefghijklmnopqrstuvwxyz"; + private static final String NUMBER_CHARS = "0123456789"; + private static final String[] BASE64_STRINGS = { + UPPER_CASE_CHARS, + LOWER_CASE_CHARS, + SPECIAL_CHARS, + NUMBER_CHARS, + SPECIAL_CHARS + NUMBER_CHARS + LOWER_CASE_CHARS + UPPER_CASE_CHARS, + UPPER_CASE_CHARS + SPECIAL_CHARS + LOWER_CASE_CHARS + NUMBER_CHARS + }; + private static final String[] BASE64_STRINGS_WITH_WHITE_SPACE = { + SPECIAL_CHARS + "\n" + NUMBER_CHARS + "\t" + LOWER_CASE_CHARS + " " + UPPER_CASE_CHARS, + UPPER_CASE_CHARS + "\n" + SPECIAL_CHARS + "\t" + LOWER_CASE_CHARS + " " + NUMBER_CHARS + }; + private static final String[] NON_BASE64_STRINGS = { + UPPER_CASE_CHARS + "!", + LOWER_CASE_CHARS + "@", + SPECIAL_CHARS + "#", + NUMBER_CHARS + "$", + SPECIAL_CHARS + ".." + NUMBER_CHARS + "?" + LOWER_CASE_CHARS + "^" + UPPER_CASE_CHARS + "*" + }; + + @Test + public void testBase64IgnoreWhiteSpace() { + for (final String s : BASE64_STRINGS) { + assertTrue(Base64Utils.isBase64IgnoreWhiteSpace(s.getBytes(StandardCharsets.UTF_8))); + assertFalse(Base64Utils.isBase64IgnoreWhiteSpace((s + "..").getBytes(StandardCharsets.UTF_8))); + } + + for (final String s : BASE64_STRINGS_WITH_WHITE_SPACE) { + assertTrue(Base64Utils.isBase64IgnoreWhiteSpace(s.getBytes(StandardCharsets.UTF_8))); + assertFalse(Base64Utils.isBase64IgnoreWhiteSpace((s + "..").getBytes(StandardCharsets.UTF_8))); + } + + for (final String s : NON_BASE64_STRINGS) { + assertFalse(Base64Utils.isBase64IgnoreWhiteSpace(s.getBytes(StandardCharsets.UTF_8))); + assertFalse(Base64Utils.isBase64IgnoreWhiteSpace((s + "..").getBytes(StandardCharsets.UTF_8))); + } + } + + @Test + public void testBase64IgnoreTrailingPeriods() { + for (final String s : BASE64_STRINGS) { + String testStr = s; + for (int i = 0; i < 10; i++) { + assertTrue(Base64Utils.isBase64IgnoreTrailingPeriods(testStr.getBytes(StandardCharsets.UTF_8))); + testStr = testStr + "."; + } + } + + for (final String s : BASE64_STRINGS_WITH_WHITE_SPACE) { + String testStr = s; + for (int i = 0; i < 2; i++) { + assertFalse(Base64Utils.isBase64IgnoreTrailingPeriods(testStr.getBytes(StandardCharsets.UTF_8))); + testStr = testStr + "."; + } + } + + for (final String s : NON_BASE64_STRINGS) { + String testStr = s; + for (int i = 0; i < 2; i++) { + assertFalse(Base64Utils.isBase64IgnoreTrailingPeriods(testStr.getBytes(StandardCharsets.UTF_8))); + testStr = testStr + "."; + } + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java index 5af9cdcc50..358cf35a43 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java @@ -51,6 +51,9 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Config related to the SchemaConformingTransformer") private SchemaConformingTransformerConfig _schemaConformingTransformerConfig; + @JsonPropertyDescription("Config related to the SchemaConformingTransformerV2") + private SchemaConformingTransformerV2Config _schemaConformingTransformerV2Config; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") private List<AggregationConfig> _aggregationConfigs; @@ -69,6 +72,7 @@ public class IngestionConfig extends BaseJsonConfig { @Nullable List<EnrichmentConfig> enrichmentConfigs, @Nullable List<TransformConfig> transformConfigs, @Nullable ComplexTypeConfig complexTypeConfig, @Nullable SchemaConformingTransformerConfig schemaConformingTransformerConfig, + @Nullable SchemaConformingTransformerV2Config schemaConformingTransformerV2Config, @Nullable List<AggregationConfig> aggregationConfigs) { _batchIngestionConfig = batchIngestionConfig; _streamIngestionConfig = streamIngestionConfig; @@ -77,6 +81,7 @@ public class IngestionConfig extends BaseJsonConfig { _transformConfigs = transformConfigs; _complexTypeConfig = complexTypeConfig; _schemaConformingTransformerConfig = schemaConformingTransformerConfig; + _schemaConformingTransformerV2Config = schemaConformingTransformerV2Config; _aggregationConfigs = aggregationConfigs; } @@ -118,6 +123,11 @@ public class IngestionConfig extends BaseJsonConfig { return _schemaConformingTransformerConfig; } + @Nullable + public SchemaConformingTransformerV2Config getSchemaConformingTransformerV2Config() { + return _schemaConformingTransformerV2Config; + } + @Nullable public List<AggregationConfig> getAggregationConfigs() { return _aggregationConfigs; @@ -164,6 +174,11 @@ public class IngestionConfig extends BaseJsonConfig { _schemaConformingTransformerConfig = schemaConformingTransformerConfig; } + public void setSchemaConformingTransformerV2Config( + SchemaConformingTransformerV2Config schemaConformingTransformerV2Config) { + _schemaConformingTransformerV2Config = schemaConformingTransformerV2Config; + } + public void setAggregationConfigs(List<AggregationConfig> aggregationConfigs) { _aggregationConfigs = aggregationConfigs; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java index 96231f39d9..e51eb65e4a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java @@ -31,8 +31,8 @@ public class SchemaConformingTransformerConfig extends BaseJsonConfig { @JsonPropertyDescription("Name of the field that should contain extra fields that are not part of the schema.") private final String _indexableExtrasField; - @JsonPropertyDescription( - "Like indexableExtrasField except it only contains fields with the suffix in unindexableFieldSuffix.") + @JsonPropertyDescription("Like indexableExtrasField except it only contains fields with the suffix in " + + "unindexableFieldSuffix.") private final String _unindexableExtrasField; @JsonPropertyDescription("The suffix of fields that must be stored in unindexableExtrasField") diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java new file mode 100644 index 0000000000..1d58d76f81 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table.ingestion; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + +public class SchemaConformingTransformerV2Config extends BaseJsonConfig { + @JsonPropertyDescription("Enable indexable extras") + private boolean _enableIndexableExtras = true; + + @JsonPropertyDescription("Name of the field that should contain extra fields that are not part of the schema.") + private String _indexableExtrasField = "json_data"; + + @JsonPropertyDescription("Enable unindexable extras") + private boolean _enableUnindexableExtras = true; + + @JsonPropertyDescription( + "Like indexableExtrasField except it only contains fields with the suffix in unindexableFieldSuffix.") + private String _unindexableExtrasField = "json_data_no_idx"; + + @JsonPropertyDescription("The suffix of fields that must be stored in unindexableExtrasField") + private String _unindexableFieldSuffix = "_noindex"; + + @JsonPropertyDescription("Array of flattened (dot-delimited) object paths to drop") + private Set<String> _fieldPathsToDrop = new HashSet<>(); + + @JsonPropertyDescription("Array of flattened (dot-delimited) object paths not to traverse further and keep same as " + + "input. This will also skip building mergedTextIndex for the field.") + private Set<String> _fieldPathsToPreserveInput = new HashSet<>(); + + @JsonPropertyDescription("Map from customized meaningful column name to json key path") + private Map<String, String> _columnNameToJsonKeyPathMap = new HashMap<>(); + + @JsonPropertyDescription("mergedTextIndex field") + private String _mergedTextIndexField = "__mergedTextIndex"; + + @JsonPropertyDescription("mergedTextIndex document max length") + private int _mergedTextIndexDocumentMaxLength = 32766; + + @JsonPropertyDescription( + "Recall that merged text index document is in the format of <value:key>. " + + "The mergedTextIndex shingling overlap length refers to the " + + "maximum search length of the value that will yield results with " + + "100% accuracy. If the value is null, shingle index will be turned off " + + "and the value will be truncated such that the document is equal to " + + "_mergedTextIndexDocumentMaxLength" + ) + private @Nullable Integer _mergedTextIndexShinglingOverlapLength = null; + + @JsonPropertyDescription("mergedTextIndex binary document detection minimum length") + private Integer _mergedTextIndexBinaryDocumentDetectionMinLength = 512; + + @JsonPropertyDescription("Array of paths to exclude from merged text index.") + private Set<String> _mergedTextIndexPathToExclude = new HashSet<>(); + + // TODO: set default value from CLPRewriter once it open sourced + @JsonPropertyDescription("Array of suffix to exclude from merged text index.") + private List<String> _mergedTextIndexSuffixToExclude = Arrays.asList("_logtype", "_dictionaryVars", "_encodedVars"); + + @JsonPropertyDescription("Dedicated fields to double ingest into json_data column") + private Set<String> _fieldsToDoubleIngest = new HashSet<>(); + + @JsonCreator + public SchemaConformingTransformerV2Config( + @JsonProperty("enableIndexableExtras") @Nullable Boolean enableIndexableExtras, + @JsonProperty("indexableExtrasField") String indexableExtrasField, + @JsonProperty("enableUnindexableExtras") @Nullable Boolean enableUnindexableExtras, + @JsonProperty("unindexableExtrasField") @Nullable String unindexableExtrasField, + @JsonProperty("unindexableFieldSuffix") @Nullable String unindexableFieldSuffix, + @JsonProperty("fieldPathsToDrop") @Nullable Set<String> fieldPathsToDrop, + @JsonProperty("fieldPathsToKeepSameAsInput") @Nullable Set<String> fieldPathsToPreserveInput, + @JsonProperty("mergedTextIndexField") @Nullable String mergedTextIndexField, + @JsonProperty("mergedTextIndexDocumentMaxLength") @Nullable Integer mergedTextIndexDocumentMaxLength, + @JsonProperty("mergedTextIndexShinglingOverlapLength") @Nullable Integer mergedTextIndexShinglingOverlapLength, + @JsonProperty("mergedTextIndexBinaryDocumentDetectionMinLength") + @Nullable Integer mergedTextIndexBinaryDocumentDetectionMinLength, + @JsonProperty("mergedTextIndexPathToExclude") @Nullable Set<String> mergedTextIndexPathToExclude, + @JsonProperty("fieldsToDoubleIngest") @Nullable Set<String> fieldsToDoubleIngest + ) { + setEnableIndexableExtras(enableIndexableExtras); + setIndexableExtrasField(indexableExtrasField); + setEnableUnindexableExtras(enableUnindexableExtras); + setUnindexableExtrasField(unindexableExtrasField); + setUnindexableFieldSuffix(unindexableFieldSuffix); + setFieldPathsToDrop(fieldPathsToDrop); + setFieldPathsToPreserveInput(fieldPathsToPreserveInput); + + setMergedTextIndexField(mergedTextIndexField); + setMergedTextIndexDocumentMaxLength(mergedTextIndexDocumentMaxLength); + setMergedTextIndexShinglingDocumentOverlapLength(mergedTextIndexShinglingOverlapLength); + setMergedTextIndexBinaryDocumentDetectionMinLength(mergedTextIndexBinaryDocumentDetectionMinLength); + setMergedTextIndexPathToExclude(mergedTextIndexPathToExclude); + setFieldsToDoubleIngest(fieldsToDoubleIngest); + } + + public SchemaConformingTransformerV2Config setEnableIndexableExtras(Boolean enableIndexableExtras) { + _enableIndexableExtras = enableIndexableExtras == null ? _enableUnindexableExtras : enableIndexableExtras; + return this; + } + + public String getIndexableExtrasField() { + return _enableIndexableExtras ? _indexableExtrasField : null; + } + + public SchemaConformingTransformerV2Config setIndexableExtrasField(String indexableExtrasField) { + _indexableExtrasField = indexableExtrasField == null ? _indexableExtrasField : indexableExtrasField; + return this; + } + + public SchemaConformingTransformerV2Config setEnableUnindexableExtras(Boolean enableUnindexableExtras) { + _enableUnindexableExtras = enableUnindexableExtras == null ? _enableUnindexableExtras : enableUnindexableExtras; + return this; + } + + public String getUnindexableExtrasField() { + return _enableUnindexableExtras ? _unindexableExtrasField : null; + } + + public SchemaConformingTransformerV2Config setUnindexableExtrasField(String unindexableExtrasField) { + _unindexableExtrasField = unindexableExtrasField == null ? _unindexableExtrasField : unindexableExtrasField; + return this; + } + + public String getUnindexableFieldSuffix() { + return _unindexableFieldSuffix; + } + + public SchemaConformingTransformerV2Config setUnindexableFieldSuffix(String unindexableFieldSuffix) { + _unindexableFieldSuffix = unindexableFieldSuffix == null ? _unindexableFieldSuffix : unindexableFieldSuffix; + return this; + } + + public Set<String> getFieldPathsToDrop() { + return _fieldPathsToDrop; + } + + public SchemaConformingTransformerV2Config setFieldPathsToDrop(Set<String> fieldPathsToDrop) { + _fieldPathsToDrop = fieldPathsToDrop == null ? _fieldPathsToDrop : fieldPathsToDrop; + return this; + } + + public Set<String> getFieldPathsToPreserveInput() { + return _fieldPathsToPreserveInput; + } + + public SchemaConformingTransformerV2Config setFieldPathsToPreserveInput(Set<String> fieldPathsToPreserveInput) { + _fieldPathsToPreserveInput = fieldPathsToPreserveInput == null ? _fieldPathsToPreserveInput + : fieldPathsToPreserveInput; + return this; + } + + public Map<String, String> getColumnNameToJsonKeyPathMap() { + return _columnNameToJsonKeyPathMap; + } + + public SchemaConformingTransformerV2Config setColumnNameToJsonKeyPathMap( + Map<String, String> columnNameToJsonKeyPathMap) { + _columnNameToJsonKeyPathMap = columnNameToJsonKeyPathMap == null + ? _columnNameToJsonKeyPathMap : columnNameToJsonKeyPathMap; + return this; + } + + public String getMergedTextIndexField() { + return _mergedTextIndexField; + } + + public SchemaConformingTransformerV2Config setMergedTextIndexField(String mergedTextIndexField) { + _mergedTextIndexField = mergedTextIndexField == null ? _mergedTextIndexField : mergedTextIndexField; + return this; + } + + public Integer getMergedTextIndexDocumentMaxLength() { + return _mergedTextIndexDocumentMaxLength; + } + + public SchemaConformingTransformerV2Config setMergedTextIndexDocumentMaxLength( + Integer mergedTextIndexDocumentMaxLength + ) { + _mergedTextIndexDocumentMaxLength = mergedTextIndexDocumentMaxLength == null + ? _mergedTextIndexDocumentMaxLength : mergedTextIndexDocumentMaxLength; + return this; + } + + public Integer getMergedTextIndexShinglingOverlapLength() { + return _mergedTextIndexShinglingOverlapLength; + } + + public SchemaConformingTransformerV2Config setMergedTextIndexShinglingDocumentOverlapLength( + Integer mergedTextIndexShinglingOverlapLength) { + _mergedTextIndexShinglingOverlapLength = mergedTextIndexShinglingOverlapLength; + return this; + } + + public Integer getMergedTextIndexBinaryDocumentDetectionMinLength() { + return _mergedTextIndexBinaryDocumentDetectionMinLength; + } + + public SchemaConformingTransformerV2Config setMergedTextIndexBinaryDocumentDetectionMinLength( + Integer mergedTextIndexBinaryDocumentDetectionMinLength) { + _mergedTextIndexBinaryDocumentDetectionMinLength = mergedTextIndexBinaryDocumentDetectionMinLength == null + ? _mergedTextIndexBinaryDocumentDetectionMinLength : mergedTextIndexBinaryDocumentDetectionMinLength; + return this; + } + + public Set<String> getMergedTextIndexPathToExclude() { + return _mergedTextIndexPathToExclude; + } + + public List<String> getMergedTextIndexSuffixToExclude() { + return _mergedTextIndexSuffixToExclude; + } + + public SchemaConformingTransformerV2Config setMergedTextIndexPathToExclude(Set<String> mergedTextIndexPathToExclude) { + _mergedTextIndexPathToExclude = mergedTextIndexPathToExclude == null + ? _mergedTextIndexPathToExclude : mergedTextIndexPathToExclude; + return this; + } + + public Set<String> getFieldsToDoubleIngest() { + return _fieldsToDoubleIngest; + } + + public SchemaConformingTransformerV2Config setFieldsToDoubleIngest(Set<String> fieldsToDoubleIngest) { + _fieldsToDoubleIngest = fieldsToDoubleIngest == null ? _fieldsToDoubleIngest : fieldsToDoubleIngest; + return this; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org