This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 13673f1150 Add SchemaConformingTransformerV2 to enhance text search 
abilities (#12788)
13673f1150 is described below

commit 13673f11508f00ae35f5bb12f9cf97b6897ebfba
Author: lnbest0707 <106711887+lnbest0707-u...@users.noreply.github.com>
AuthorDate: Tue Apr 9 16:11:06 2024 -0700

    Add SchemaConformingTransformerV2 to enhance text search abilities (#12788)
    
    * Add SchemaConformingTransformerV2 to enhance text search abilities
    
    * Fix style
    
    * Update __mergedTextIndex field logics
    
    * Fix UT
    
    * Resolve comments and add fieldPathsToPreserveInput config
---
 .../apache/pinot/common/metrics/ServerGauge.java   |   1 +
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../apache/pinot/queries/TransformQueriesTest.java |   2 +-
 .../recordtransformer/CompositeTransformer.java    |  12 +-
 .../SchemaConformingTransformer.java               |  31 +-
 .../SchemaConformingTransformerV2.java             | 727 ++++++++++++++++
 .../pinot/segment/local/utils/Base64Utils.java     |  44 +
 .../pinot/segment/local/utils/IngestionUtils.java  |   3 +-
 .../segment/local/utils/TableConfigUtils.java      |   8 +
 .../SchemaConformingTransformerV2Test.java         | 934 +++++++++++++++++++++
 .../pinot/segment/local/utils/Base64UtilsTest.java |  96 +++
 .../config/table/ingestion/IngestionConfig.java    |  15 +
 .../SchemaConformingTransformerConfig.java         |   4 +-
 .../SchemaConformingTransformerV2Config.java       | 253 ++++++
 14 files changed, 2120 insertions(+), 11 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 45f34803a0..f0a1fdd136 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -38,6 +38,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS("seconds", false),
   LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS("seconds", false),
   REALTIME_OFFHEAP_MEMORY_USED("bytes", false),
+  REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN("bytes", false),
   REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
   LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
   // Upsert metrics
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 02005a3814..ed9769a68e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -40,6 +40,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   INVALID_REALTIME_ROWS_DROPPED("rows", false),
   INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
+  REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE("bytes", false),
   REALTIME_OFFSET_COMMITS("commits", true),
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
   STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 1f04d16d3b..cfb570d80e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -135,7 +135,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
         .setIngestionConfig(new IngestionConfig(null, null, null, null,
             Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3  == 
null || "
                 + "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 
}, INT_COL1, INT_COL1_V3)")),
-            null, null, null))
+            null, null, null, null))
         .build();
     Schema schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index cf88629f10..a1bfcba52a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -54,8 +54,13 @@ public class CompositeTransformer implements 
RecordTransformer {
    *     records that have varying fields to a fixed schema without dropping 
any fields
    *   </li>
    *   <li>
-   *     {@link DataTypeTransformer} after {@link SchemaConformingTransformer} 
to convert values to comply with the
-   *     schema
+   *     Optional {@link SchemaConformingTransformerV2} after {@link 
FilterTransformer}, so that we can transform
+   *     input records that have varying fields to a fixed schema and keep or 
drop other fields by configuration. We
+   *     could also gain enhanced text search capabilities from it.
+   *   </li>
+   *   <li>
+   *     {@link DataTypeTransformer} after {@link SchemaConformingTransformer} 
or {@link SchemaConformingTransformerV2}
+   *     to convert values to comply with the schema
    *   </li>
    *   <li>
    *     Optional {@link TimeValidationTransformer} after {@link 
DataTypeTransformer} so that time value is converted to
@@ -78,7 +83,8 @@ public class CompositeTransformer implements 
RecordTransformer {
    */
   public static List<RecordTransformer> getDefaultTransformers(TableConfig 
tableConfig, Schema schema) {
     return Stream.of(new ExpressionTransformer(tableConfig, schema), new 
FilterTransformer(tableConfig),
-            new SchemaConformingTransformer(tableConfig, schema), new 
DataTypeTransformer(tableConfig, schema),
+            new SchemaConformingTransformer(tableConfig, schema),
+            new SchemaConformingTransformerV2(tableConfig, schema), new 
DataTypeTransformer(tableConfig, schema),
             new TimeValidationTransformer(tableConfig, schema), new 
SpecialValueTransformer(schema),
             new NullValueTransformer(tableConfig, schema), new 
SanitizationTransformer(schema)).filter(t -> !t.isNoOp())
         .collect(Collectors.toList());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
index 0d01e68f35..b9cfdce5a8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformer.java
@@ -174,7 +174,7 @@ public class SchemaConformingTransformer implements 
RecordTransformer {
   /**
    * @return The field type for the given extras field
    */
-  private static DataType getAndValidateExtrasFieldType(Schema schema, 
@Nonnull String extrasFieldName) {
+  static DataType getAndValidateExtrasFieldType(Schema schema, @Nonnull String 
extrasFieldName) {
     FieldSpec fieldSpec = schema.getFieldSpecFor(extrasFieldName);
     Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in 
schema", extrasFieldName);
     DataType fieldDataType = fieldSpec.getDataType();
@@ -250,7 +250,7 @@ public class SchemaConformingTransformer implements 
RecordTransformer {
    * @param subKeys Returns the sub-keys
    * @throws IllegalArgumentException if any sub-key is empty
    */
-  private static void getAndValidateSubKeys(String key, int 
firstKeySeparatorIdx, List<String> subKeys)
+   static void getAndValidateSubKeys(String key, int firstKeySeparatorIdx, 
List<String> subKeys)
       throws IllegalArgumentException {
     int subKeyBeginIdx = 0;
     int subKeyEndIdx = firstKeySeparatorIdx;
@@ -511,7 +511,16 @@ class ExtraFieldsContainer {
     if (null == _indexableExtras) {
       _indexableExtras = new HashMap<>();
     }
-    _indexableExtras.put(key, value);
+    if (key == null && value instanceof Map) {
+      // If the key is null, it means that the value is a map that should be 
merged with the indexable extras
+      _indexableExtras.putAll((Map<String, Object>) value);
+    } else if (_indexableExtras.containsKey(key) && _indexableExtras.get(key) 
instanceof Map && value instanceof Map) {
+      // If the key already exists in the indexable extras and both the 
existing value and the new value are maps,
+      // merge the two maps
+      ((Map<String, Object>) _indexableExtras.get(key)).putAll((Map<String, 
Object>) value);
+    } else {
+      _indexableExtras.put(key, value);
+    }
   }
 
   /**
@@ -524,7 +533,17 @@ class ExtraFieldsContainer {
     if (null == _unindexableExtras) {
       _unindexableExtras = new HashMap<>();
     }
-    _unindexableExtras.put(key, value);
+    if (key == null && value instanceof Map) {
+      // If the key is null, it means that the value is a map that should be 
merged with the unindexable extras
+      _unindexableExtras.putAll((Map<String, Object>) value);
+    } else if (_unindexableExtras.containsKey(key) && 
_unindexableExtras.get(key) instanceof Map
+        && value instanceof Map) {
+      // If the key already exists in the uindexable extras and both the 
existing value and the new value are maps,
+      // merge the two maps
+      ((Map<String, Object>) _unindexableExtras.get(key)).putAll((Map<String, 
Object>) value);
+    } else {
+      _unindexableExtras.put(key, value);
+    }
   }
 
   /**
@@ -542,4 +561,8 @@ class ExtraFieldsContainer {
       addUnindexableEntry(key, childUnindexableFields);
     }
   }
+
+  public void addChild(ExtraFieldsContainer child) {
+    addChild(null, child);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
new file mode 100644
index 0000000000..5471f784bc
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2.java
@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.utils.Base64Utils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This transformer evolves from {@link SchemaConformingTransformer} and is 
designed to support extra cases for
+ * better text searching:
+ *   - Support over-lapping schema fields, in which case it could support 
schema column "a" and "a.b" at the same time.
+ *     And it only allows primitive type fields to be the value.
+ *   - Extract flattened key-value pairs as mergedTextIndex for better text 
searching.
+ *   - Add shingle index tokenization functionality for extremely large text 
fields.
+ * <p>
+ * For example, consider this record:
+ * <pre>
+ * {
+ *   "a": 1,
+ *   "b": "2",
+ *   "c": {
+ *     "d": 3,
+ *     "e_noindex": 4,
+ *     "f_noindex": {
+ *       "g": 5
+ *      },
+ *     "x": {
+ *       "y": 9,
+ *       "z_noindex": 10
+ *     }
+ *   }
+ *   "h_noindex": "6",
+ *   "i_noindex": {
+ *     "j": 7,
+ *     "k": 8
+ *   }
+ * }
+ * </pre>
+ * And let's say the table's schema contains these fields:
+ * <ul>
+ *   <li>a</li>
+ *   <li>c</li>
+ *   <li>c.d</li>
+ * </ul>
+ * <p>
+ * The record would be transformed into the following (refer to {@link 
SchemaConformingTransformerV2Config} for
+ *  * default constant values):
+ * <pre>
+ * {
+ *   "a": 1,
+ *   "c": null,
+ *   "c.d": 3,
+ *   "json_data": {
+ *     "b": "2",
+ *     "c": {
+ *       "x": {
+ *         "y": 9
+ *       }
+ *     }
+ *   }
+ *   "json_data_no_idx": {
+ *     "c": {
+ *       "e_noindex": 4,
+ *       "f_noindex": {
+ *         "g": 5
+ *       },
+ *       "x": {
+ *         "z_noindex": 10
+ *       }
+ *     },
+ *     "h_noindex": "6",
+ *     "i_noindex": {
+ *       "j": 7,
+ *       "k": 8
+ *     }
+ *   },
+ *   "__mergedTextIndex": [
+ *     "1:a", "2:b", "3:c.d", "9:c.x.y"
+ *   ]
+ * }
+ * </pre>
+ * <p>
+ * The "__mergedTextIndex" could filter and manipulate the data based on the 
configuration in
+ * {@link SchemaConformingTransformerV2Config}.
+ */
+public class SchemaConformingTransformerV2 implements RecordTransformer {
+  private static final Logger _logger = 
LoggerFactory.getLogger(SchemaConformingTransformerV2.class);
+  private static final int MAXIMUM_LUCENE_DOCUMENT_SIZE = 32766;
+  private static final String MIN_DOCUMENT_LENGTH_DESCRIPTION =
+      "key length + `:` + shingle index overlap length + one non-overlap char";
+
+  private final boolean _continueOnError;
+  private final SchemaConformingTransformerV2Config _transformerConfig;
+  private final DataType _indexableExtrasFieldType;
+  private final DataType _unindexableExtrasFieldType;
+  private final DimensionFieldSpec _mergedTextIndexFieldSpec;
+  @Nullable
+  ServerMetrics _serverMetrics = null;
+  private SchemaTreeNode _schemaTree;
+  @Nullable
+  private PinotMeter _realtimeMergedTextIndexTruncatedDocumentSizeMeter = null;
+  private String _tableName;
+  private long _mergedTextIndexDocumentBytesCount = 0L;
+  private long _mergedTextIndexDocumentCount = 0L;
+
+  public SchemaConformingTransformerV2(TableConfig tableConfig, Schema schema) 
{
+    if (null == tableConfig.getIngestionConfig() || null == 
tableConfig.getIngestionConfig()
+        .getSchemaConformingTransformerV2Config()) {
+      _continueOnError = false;
+      _transformerConfig = null;
+      _indexableExtrasFieldType = null;
+      _unindexableExtrasFieldType = null;
+      _mergedTextIndexFieldSpec = null;
+      return;
+    }
+
+    _continueOnError = tableConfig.getIngestionConfig().isContinueOnError();
+    _transformerConfig = 
tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config();
+    String indexableExtrasFieldName = 
_transformerConfig.getIndexableExtrasField();
+    _indexableExtrasFieldType =
+        indexableExtrasFieldName == null ? null : 
SchemaConformingTransformer.getAndValidateExtrasFieldType(schema,
+            indexableExtrasFieldName);
+    String unindexableExtrasFieldName = 
_transformerConfig.getUnindexableExtrasField();
+    _unindexableExtrasFieldType =
+        unindexableExtrasFieldName == null ? null : 
SchemaConformingTransformer.getAndValidateExtrasFieldType(schema,
+            unindexableExtrasFieldName);
+    _mergedTextIndexFieldSpec = 
schema.getDimensionSpec(_transformerConfig.getMergedTextIndexField());
+    _tableName = tableConfig.getTableName();
+    _schemaTree = validateSchemaAndCreateTree(schema, _transformerConfig);
+    _serverMetrics = ServerMetrics.get();
+  }
+
+  /**
+   * Validates the schema against the given transformer's configuration.
+   */
+  public static void validateSchema(@Nonnull Schema schema,
+      @Nonnull SchemaConformingTransformerV2Config transformerConfig) {
+    validateSchemaFieldNames(schema.getPhysicalColumnNames(), 
transformerConfig);
+
+    String indexableExtrasFieldName = 
transformerConfig.getIndexableExtrasField();
+    if (null != indexableExtrasFieldName) {
+      SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, 
indexableExtrasFieldName);
+    }
+    String unindexableExtrasFieldName = 
transformerConfig.getUnindexableExtrasField();
+    if (null != unindexableExtrasFieldName) {
+      SchemaConformingTransformer.getAndValidateExtrasFieldType(schema, 
indexableExtrasFieldName);
+    }
+
+    validateSchemaAndCreateTree(schema, transformerConfig);
+  }
+
+  /**
+   * Heuristic filter to detect whether a byte array is longer than a 
specified length and contains only base64
+   * characters so that we treat it as encoded binary data.
+   * @param bytes array to check
+   * @param minLength byte array shorter than this length will not be treated 
as encoded binary data
+   * @return true if the input bytes is base64 encoded binary data by the 
heuristic above, false otherwise
+   */
+  public static boolean base64ValueFilter(final byte[] bytes, int minLength) {
+    return bytes.length >= minLength && 
Base64Utils.isBase64IgnoreTrailingPeriods(bytes);
+  }
+
+  /**
+   * Validates that none of the schema fields have names that conflict with 
the transformer's configuration.
+   */
+  private static void validateSchemaFieldNames(Set<String> schemaFields,
+      SchemaConformingTransformerV2Config transformerConfig) {
+    // Validate that none of the columns in the schema end with 
unindexableFieldSuffix
+    String unindexableFieldSuffix = 
transformerConfig.getUnindexableFieldSuffix();
+    if (null != unindexableFieldSuffix) {
+      for (String field : schemaFields) {
+        Preconditions.checkState(!field.endsWith(unindexableFieldSuffix), 
"Field '%s' has no-index suffix '%s'", field,
+            unindexableFieldSuffix);
+      }
+    }
+
+    // Validate that none of the columns in the schema end overlap with the 
fields in fieldPathsToDrop
+    Set<String> fieldPathsToDrop = transformerConfig.getFieldPathsToDrop();
+    if (null != fieldPathsToDrop) {
+      Set<String> fieldIntersection = new HashSet<>(schemaFields);
+      fieldIntersection.retainAll(fieldPathsToDrop);
+      Preconditions.checkState(fieldIntersection.isEmpty(), "Fields in schema 
overlap with fieldPathsToDrop");
+    }
+  }
+
+  /**
+   * Validates the schema with a {@link SchemaConformingTransformerV2Config} 
instance and creates a tree representing
+   * the fields in the schema to be used when transforming input records. 
Refer to {@link SchemaTreeNode} for details.
+   * @throws IllegalArgumentException if schema validation fails in:
+   * <ul>
+   *   <li>One of the fields in the schema has a name which when interpreted 
as a JSON path, corresponds to an object
+   *   with an empty sub-key. E.g., the field name "a..b" corresponds to the 
JSON {"a": {"": {"b": ...}}}</li>
+   * </ul>
+   */
+  private static SchemaTreeNode validateSchemaAndCreateTree(@Nonnull Schema 
schema,
+      @Nonnull SchemaConformingTransformerV2Config transformerConfig)
+      throws IllegalArgumentException {
+    Set<String> schemaFields = schema.getPhysicalColumnNames();
+    Map<String, String> jsonKeyPathToColumnNameMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : 
transformerConfig.getColumnNameToJsonKeyPathMap().entrySet()) {
+      String columnName = entry.getKey();
+      String jsonKeyPath = entry.getValue();
+      schemaFields.remove(columnName);
+      schemaFields.add(jsonKeyPath);
+      jsonKeyPathToColumnNameMap.put(jsonKeyPath, columnName);
+    }
+
+    SchemaTreeNode rootNode = new SchemaTreeNode("", null, schema);
+    List<String> subKeys = new ArrayList<>();
+    for (String field : schemaFields) {
+      SchemaTreeNode currentNode = rootNode;
+      int keySeparatorIdx = field.indexOf(JsonUtils.KEY_SEPARATOR);
+      if (-1 == keySeparatorIdx) {
+        // Not a flattened key
+        currentNode = rootNode.getAndCreateChild(field, schema);
+      } else {
+        subKeys.clear();
+        SchemaConformingTransformer.getAndValidateSubKeys(field, 
keySeparatorIdx, subKeys);
+        for (String subKey : subKeys) {
+          SchemaTreeNode childNode = currentNode.getAndCreateChild(subKey, 
schema);
+          currentNode = childNode;
+        }
+      }
+      currentNode.setColumn(jsonKeyPathToColumnNameMap.get(field));
+    }
+
+    return rootNode;
+  }
+
+  @Override
+  public boolean isNoOp() {
+    return null == _transformerConfig;
+  }
+
+  @Nullable
+  @Override
+  public GenericRow transform(GenericRow record) {
+    GenericRow outputRecord = new GenericRow();
+    Map<String, Object> mergedTextIndexMap = new HashMap<>();
+
+    try {
+      Deque<String> jsonPath = new ArrayDeque<>();
+      ExtraFieldsContainer extraFieldsContainer =
+          new ExtraFieldsContainer(null != 
_transformerConfig.getUnindexableExtrasField());
+      for (Map.Entry<String, Object> recordEntry : 
record.getFieldToValueMap().entrySet()) {
+        String recordKey = recordEntry.getKey();
+        Object recordValue = recordEntry.getValue();
+        jsonPath.addLast(recordKey);
+        ExtraFieldsContainer currentFieldsContainer =
+            processField(_schemaTree, jsonPath, recordValue, true, 
outputRecord, mergedTextIndexMap);
+        extraFieldsContainer.addChild(currentFieldsContainer);
+        jsonPath.removeLast();
+      }
+      putExtrasField(_transformerConfig.getIndexableExtrasField(), 
_indexableExtrasFieldType,
+          extraFieldsContainer.getIndexableExtras(), outputRecord);
+      putExtrasField(_transformerConfig.getUnindexableExtrasField(), 
_unindexableExtrasFieldType,
+          extraFieldsContainer.getUnindexableExtras(), outputRecord);
+
+      // Generate merged text index
+      if (null != _mergedTextIndexFieldSpec && !mergedTextIndexMap.isEmpty()) {
+        List<String> luceneDocuments = 
getLuceneDocumentsFromMergedTextIndexMap(mergedTextIndexMap);
+        if (_mergedTextIndexFieldSpec.isSingleValueField()) {
+          outputRecord.putValue(_transformerConfig.getMergedTextIndexField(), 
String.join(" ", luceneDocuments));
+        } else {
+          outputRecord.putValue(_transformerConfig.getMergedTextIndexField(), 
luceneDocuments);
+        }
+      }
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw e;
+      }
+      _logger.error("Couldn't transform record: {}", record.toString(), e);
+      outputRecord.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
+    }
+
+    return outputRecord;
+  }
+
+  /**
+   * The method traverses the record and schema tree at the same time. It 
would check the specs of record key/value
+   * pairs with the corresponding schema tree node and {#link 
SchemaConformingTransformerV2Config}. Finally drop or put
+   * them into the output record with the following logics:
+   * Taking example:
+   * {
+   *   "a": 1,
+   *   "b": {
+   *     "c": 2,
+   *     "d": 3,
+   *     "d_noIdx": 4
+   *   }
+   *   "b_noIdx": {
+   *     "c": 5,
+   *     "d": 6,
+   *   }
+   * }
+   * with column "a", "b", "b.c" in schema
+   * There are two types of output:
+   *  - flattened keys with values, e.g.,
+   *    - keyPath as column and value as leaf node, e.g., "a": 1, "b.c": 2. 
However, "b" is not a leaf node, so it would
+   *    be skipped
+   *    - __mergedTestIdx storing ["1:a", "2:b.c", "3:b.d"] as a string array
+   *  - structured Json format, e.g.,
+   *    - indexableFields/json_data: {"a": 1, "b": {"c": 2, "d": 3}}
+   *    - unindexableFields/json_data_noIdx: {"b": {"d_noIdx": 4} ,"b_noIdx": 
{"c": 5, "d": 6}}
+   * Expected behavior:
+   *  - If the current key is special, it would be added to the outputRecord 
and skip subtree
+   *  - If the keyJsonPath is in fieldPathsToDrop, it and its subtree would be 
skipped
+   *  - At leaf node (base case in recursion):
+   *    - Parse keyPath and value and add as flattened result to outputRecord
+   *    - Return structured fields as ExtraFieldsContainer
+   *   (leaf node is defined as node not as "Map" type. Leaf node is possible 
to be collection of or array of "Map". But
+   *   for simplicity, we still treat it as leaf node and do not traverse its 
children)
+   *  - For non-leaf node
+   *    - Construct ExtraFieldsContainer based on children's result and return
+   *
+   * @param parentNode The parent node in the schema tree which might or might 
not has a child with the given key. If
+   *                  parentNode is null, it means the current key is out of 
the schema tree.
+   * @param jsonPath The key json path split by "."
+   * @param value The value of the current field
+   * @param isIndexable Whether the current field is indexable
+   * @param outputRecord The output record updated during traverse
+   * @param mergedTextIndexMap The merged text index map updated during 
traverse
+   * @return ExtraFieldsContainer carries the indexable and unindexable fields 
of the current node as well as its
+   * subtree
+   */
+  private ExtraFieldsContainer processField(SchemaTreeNode parentNode, 
Deque<String> jsonPath, Object value,
+      boolean isIndexable, GenericRow outputRecord, Map<String, Object> 
mergedTextIndexMap) {
+    // Common variables
+    boolean storeIndexableExtras = 
_transformerConfig.getIndexableExtrasField() != null;
+    boolean storeUnindexableExtras = 
_transformerConfig.getUnindexableExtrasField() != null;
+    String key = jsonPath.peekLast();
+    ExtraFieldsContainer extraFieldsContainer = new 
ExtraFieldsContainer(storeUnindexableExtras);
+
+    // Base case
+    if (StreamDataDecoderImpl.isSpecialKeyType(key) || 
GenericRow.isSpecialKeyType(key)) {
+      outputRecord.putValue(key, value);
+      return extraFieldsContainer;
+    }
+
+    String keyJsonPath = String.join(".", jsonPath);
+    if 
(_transformerConfig.getFieldPathsToPreserveInput().contains(keyJsonPath)) {
+      outputRecord.putValue(keyJsonPath, value);
+      return extraFieldsContainer;
+    }
+
+    Set<String> fieldPathsToDrop = _transformerConfig.getFieldPathsToDrop();
+    if (null != fieldPathsToDrop && fieldPathsToDrop.contains(keyJsonPath)) {
+      return extraFieldsContainer;
+    }
+
+    SchemaTreeNode currentNode = parentNode == null ? null : 
parentNode.getChild(key);
+    String unindexableFieldSuffix = 
_transformerConfig.getUnindexableFieldSuffix();
+    isIndexable = isIndexable && (null == unindexableFieldSuffix || 
!key.endsWith(unindexableFieldSuffix));
+    if (!(value instanceof Map)) {
+      // leaf node
+      if (!isIndexable) {
+        extraFieldsContainer.addUnindexableEntry(key, value);
+      } else {
+        if (null != currentNode && currentNode.isColumn()) {
+          // In schema
+          outputRecord.putValue(currentNode.getColumnName(), 
currentNode.getValue(value));
+          if 
(_transformerConfig.getFieldsToDoubleIngest().contains(keyJsonPath)) {
+            extraFieldsContainer.addIndexableEntry(key, value);
+          }
+          mergedTextIndexMap.put(keyJsonPath, value);
+        } else {
+          // Out of schema
+          if (storeIndexableExtras) {
+            extraFieldsContainer.addIndexableEntry(key, value);
+            mergedTextIndexMap.put(keyJsonPath, value);
+          }
+        }
+      }
+      return extraFieldsContainer;
+    }
+    // Traverse the subtree
+    Map<String, Object> valueAsMap = (Map<String, Object>) value;
+    for (Map.Entry<String, Object> entry : valueAsMap.entrySet()) {
+      jsonPath.addLast(entry.getKey());
+      ExtraFieldsContainer childContainer =
+          processField(currentNode, jsonPath, entry.getValue(), isIndexable, 
outputRecord, mergedTextIndexMap);
+      extraFieldsContainer.addChild(key, childContainer);
+      jsonPath.removeLast();
+    }
+    return extraFieldsContainer;
+  }
+
+  /**
+   * Generate an Lucene document based on the provided key-value pair.
+   * The index document follows this format: "val:key".
+   * @param kv                               used to generate text index 
documents
+   * @param indexDocuments                   a list to store the generated 
index documents
+   * @param mergedTextIndexDocumentMaxLength which we enforce via truncation 
during document generation
+   */
+  public void generateTextIndexLuceneDocument(Map.Entry<String, Object> kv, 
List<String> indexDocuments,
+      Integer mergedTextIndexDocumentMaxLength) {
+    String key = kv.getKey();
+    String val;
+    // To avoid redundant leading and tailing '"', only convert to JSON string 
if the value is a list or an array
+    if (kv.getValue() instanceof Collection || kv.getValue() instanceof 
Object[]) {
+      try {
+        val = JsonUtils.objectToString(kv.getValue());
+      } catch (JsonProcessingException e) {
+        val = kv.getValue().toString();
+      }
+    } else {
+      val = kv.getValue().toString();
+    }
+
+    // TODO: theoretically, the key length + 1 could cause integer overflow. 
But in reality, upstream message size
+    //  limit usually could not reach that high. We should revisit this if we 
see any issue.
+    if (key.length() + 1 > MAXIMUM_LUCENE_DOCUMENT_SIZE) {
+      _logger.error("The provided key's length is too long, text index 
document cannot be truncated");
+      return;
+    }
+
+    // Truncate the value to ensure the generated index document is less or 
equal to mergedTextIndexDocumentMaxLength
+    // The value length should be the mergedTextIndexDocumentMaxLength minus 
":" character (length 1) minus key length
+    int valueTruncationLength = mergedTextIndexDocumentMaxLength - 1 - 
key.length();
+    if (val.length() > valueTruncationLength) {
+      _realtimeMergedTextIndexTruncatedDocumentSizeMeter = _serverMetrics
+          .addMeteredTableValue(_tableName, 
ServerMeter.REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE,
+              key.length() + 1 + val.length(), 
_realtimeMergedTextIndexTruncatedDocumentSizeMeter);
+      val = val.substring(0, valueTruncationLength);
+    }
+
+    _mergedTextIndexDocumentBytesCount += key.length() + 1 + val.length();
+    _mergedTextIndexDocumentCount += 1;
+    _serverMetrics.setValueOfTableGauge(_tableName, 
ServerGauge.REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN,
+        _mergedTextIndexDocumentBytesCount / _mergedTextIndexDocumentCount);
+
+    indexDocuments.add(val + ":" + key);
+  }
+
+  /**
+   * Implement shingling for the merged text index based on the provided 
key-value pair.
+   * Each shingled index document retains the format of a standard index 
document: "val:key". However, "val" now
+   * denotes a sliding window of characters on the value. The total length of 
each shingled index document
+   * (key length + shingled value length + 1)must be less than or equal to 
shingleIndexMaxLength. The starting index
+   * of the sliding window for the value is increased by 
shinglingOverlapLength for every new shingled document.
+   * All shingle index documents, except for the last one, should have the 
maximum possible length. If the minimum
+   * document length (shingling overlap length + key length + 1) exceeds the 
maximum Lucene document size
+   * (MAXIMUM_LUCENE_DOCUMENT_SIZE), shingling is disabled, and the value is 
truncated to match the maximum Lucene
+   * document size. If shingleIndexMaxLength is lower than the required 
minimum document length and also lower than
+   * the maximum
+   * Lucene document size, shingleIndexMaxLength is adjusted to match the 
maximum Lucene document size.
+   *
+   * Note that the most important parameter, the shingleIndexOverlapLength, is 
the maximum search length that will yield
+   * results with 100% accuracy.
+   *
+   * Example: key-> "key", value-> "0123456789ABCDEF", max length: 10, 
shingling overlap length: 3
+   * Generated documents:
+   * 012345:key
+   *    345678:key
+   *       6789AB:key
+   *          9ABCDE:key
+   *             CDEF:key
+   * Any query with a length of 7 will yield no results, such as "0123456" or 
"6789ABC".
+   * Any query with a length of 3 will yield results with 100% accuracy (i.e. 
is always guaranteed to be searchable).
+   * Any query with a length between 4 and 6 (inclusive) has indeterminate 
accuracy.
+   * E.g. for queries with length 5, "12345", "789AB" will hit, while "23456" 
will miss.
+   *
+   * @param kv                        used to generate shingle text index 
documents
+   * @param shingleIndexDocuments        a list to store the generated shingle 
index documents
+   * @param shingleIndexMaxLength     the maximum length of each shingle index 
document. Needs to be greater than the
+   *                                  length of the key and 
shingleIndexOverlapLength + 1, and must be lower or equal
+   *                                  to MAXIMUM_LUCENE_DOCUMENT_SIZE.
+   * @param shingleIndexOverlapLength the number of characters in the 
kv-pair's value shared by two adjacent shingle
+   *                                  index documents. If null, the overlap 
length will be defaulted to half of the max
+   *                                  document length.
+   */
+  public void generateShingleTextIndexDocument(Map.Entry<String, Object> kv, 
List<String> shingleIndexDocuments,
+      int shingleIndexMaxLength, int shingleIndexOverlapLength) {
+    String key = kv.getKey();
+    String val;
+    // To avoid redundant leading and tailing '"', only convert to JSON string 
if the value is a list or an array
+    if (kv.getValue() instanceof Collection || kv.getValue() instanceof 
Object[]) {
+      try {
+        val = JsonUtils.objectToString(kv.getValue());
+      } catch (JsonProcessingException e) {
+        val = kv.getValue().toString();
+      }
+    } else {
+      val = kv.getValue().toString();
+    }
+    final int valLength = val.length();
+    final int documentSuffixLength = key.length() + 1;
+    final int minDocumentLength = documentSuffixLength + 
shingleIndexOverlapLength + 1;
+
+    if (shingleIndexOverlapLength >= valLength) {
+      if (_logger.isDebugEnabled()) {
+        _logger.warn("The shingleIndexOverlapLength " + 
shingleIndexOverlapLength + " is longer than the value length "
+            + valLength + ". Shingling will not be applied since only one 
document will be generated.");
+      }
+      generateTextIndexLuceneDocument(kv, shingleIndexDocuments, 
shingleIndexMaxLength);
+      return;
+    }
+
+    if (minDocumentLength > MAXIMUM_LUCENE_DOCUMENT_SIZE) {
+      _logger.debug("The minimum document length " + minDocumentLength + " (" 
+ MIN_DOCUMENT_LENGTH_DESCRIPTION + ") "
+          + " exceeds the limit of maximum Lucene document size " + 
MAXIMUM_LUCENE_DOCUMENT_SIZE + ". Value will be "
+          + "truncated and shingling will not be applied.");
+      generateTextIndexLuceneDocument(kv, shingleIndexDocuments, 
shingleIndexMaxLength);
+      return;
+    }
+
+    // This logging becomes expensive if user accidentally sets a very low 
shingleIndexMaxLength
+    if (shingleIndexMaxLength < minDocumentLength) {
+      _logger.debug("The shingleIndexMaxLength " + shingleIndexMaxLength + " 
is smaller than the minimum document "
+          + "length " + minDocumentLength + " (" + 
MIN_DOCUMENT_LENGTH_DESCRIPTION + "). Increasing the "
+          + "shingleIndexMaxLength to maximum Lucene document size " + 
MAXIMUM_LUCENE_DOCUMENT_SIZE + ".");
+      shingleIndexMaxLength = MAXIMUM_LUCENE_DOCUMENT_SIZE;
+    }
+
+    // Shingle window slide length is the index position on the value which we 
shall advance on every iteration.
+    // We ensure shingleIndexMaxLength >= minDocumentLength so that 
shingleWindowSlideLength >= 1.
+    int shingleWindowSlideLength = shingleIndexMaxLength - 
shingleIndexOverlapLength - documentSuffixLength;
+
+    // Generate shingle index documents
+    // When starting_idx + shingleIndexOverlapLength >= valLength, there are 
no new characters to capture, then we stop
+    // the shingle document generation loop.
+    // We ensure that shingleIndexOverlapLength < valLength so that this loop 
will be entered at lease once.
+    for (int i = 0; i + shingleIndexOverlapLength < valLength; i += 
shingleWindowSlideLength) {
+      String documentValStr = val.substring(i, Math.min(i + 
shingleIndexMaxLength - documentSuffixLength, valLength));
+      String shingleIndexDocument = documentValStr + ":" + key;
+      shingleIndexDocuments.add(shingleIndexDocument);
+      _mergedTextIndexDocumentBytesCount += shingleIndexDocument.length();
+      ++_mergedTextIndexDocumentCount;
+    }
+    _serverMetrics.setValueOfTableGauge(_tableName, 
ServerGauge.REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN,
+        _mergedTextIndexDocumentBytesCount / _mergedTextIndexDocumentCount);
+  }
+
+  /**
+   * Converts (if necessary) and adds the given extras field to the output 
record
+   */
+  private void putExtrasField(String fieldName, DataType fieldType, 
Map<String, Object> field,
+      GenericRow outputRecord) {
+    if (null == field) {
+      return;
+    }
+
+    switch (fieldType) {
+      case JSON:
+        outputRecord.putValue(fieldName, field);
+        break;
+      case STRING:
+        try {
+          outputRecord.putValue(fieldName, JsonUtils.objectToString(field));
+        } catch (JsonProcessingException e) {
+          throw new RuntimeException("Failed to convert '" + fieldName + "' to 
string", e);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Cannot convert '" + fieldName 
+ "' to " + fieldType.name());
+    }
+  }
+
+  private List<String> getLuceneDocumentsFromMergedTextIndexMap(Map<String, 
Object> mergedTextIndexMap) {
+    final Integer mergedTextIndexDocumentMaxLength = 
_transformerConfig.getMergedTextIndexDocumentMaxLength();
+    final @Nullable
+    Integer mergedTextIndexShinglingOverlapLength = 
_transformerConfig.getMergedTextIndexShinglingOverlapLength();
+    List<String> luceneDocuments = new ArrayList<>();
+    mergedTextIndexMap.entrySet().stream().filter(kv -> null != kv.getKey() && 
null != kv.getValue())
+        .filter(kv -> 
!_transformerConfig.getMergedTextIndexPathToExclude().contains(kv.getKey())).filter(
+        kv -> !base64ValueFilter(kv.getValue().toString().getBytes(),
+            
_transformerConfig.getMergedTextIndexBinaryDocumentDetectionMinLength())).filter(
+        kv -> _transformerConfig.getMergedTextIndexSuffixToExclude().stream()
+            .anyMatch(suffix -> !kv.getKey().endsWith(suffix))).forEach(kv -> {
+      if (null == mergedTextIndexShinglingOverlapLength) {
+        generateTextIndexLuceneDocument(kv, luceneDocuments, 
mergedTextIndexDocumentMaxLength);
+      } else {
+        generateShingleTextIndexDocument(kv, luceneDocuments, 
mergedTextIndexDocumentMaxLength,
+            mergedTextIndexShinglingOverlapLength);
+      }
+    });
+    return luceneDocuments;
+  }
+}
+
+/**
+ * SchemaTreeNode represents the tree node when we construct the schema tree. 
The node could be either leaf node or
+ * non-leaf node. Both types of node could hold the volumn as a column in the 
schema.
+ * For example, the schema with columns a, b, c, d.e, d.f, x.y, x.y.z, x.y.w 
will have the following tree structure:
+ * root -- a*
+ *      -- b*
+ *      -- c*
+ *      -- d -- e*
+ *           -- f*
+ *      -- x* -- y* -- z*
+ *                  -- w*
+ * where node with "*" could represent a valid column in the schema.
+ */
+class SchemaTreeNode {
+  private boolean _isColumn;
+  private Map<String, SchemaTreeNode> _children;
+  // Taking the example of key "x.y.z", the keyName will be "z" and the 
parentPath will be "x.y"
+  // Root node would have keyName as "" and parentPath as null
+  // Root node's children will have keyName as the first level key and 
parentPath as ""
+  @Nonnull
+  private String _keyName;
+  @Nullable
+  private String _columnName;
+  @Nullable
+  private String _parentPath;
+  private FieldSpec _fieldSpec;
+
+  public SchemaTreeNode(String keyName, String parentPath, Schema schema) {
+    _keyName = keyName;
+    _parentPath = parentPath;
+    _fieldSpec = schema.getFieldSpecFor(getJsonKeyPath());
+    _children = new HashMap<>();
+  }
+
+  public boolean isColumn() {
+    return _isColumn;
+  }
+
+  public void setColumn(String columnName) {
+    if (columnName == null) {
+      _columnName = getJsonKeyPath();
+    } else {
+      _columnName = columnName;
+    }
+    _isColumn = true;
+  }
+
+  public boolean hasChild(String key) {
+    return _children.containsKey(key);
+  }
+
+  /**
+   * If does not have the child node, add a child node to the current node and 
return the child node.
+   * If the child node already exists, return the existing child node.
+   * @param key
+   * @return
+   */
+  public SchemaTreeNode getAndCreateChild(String key, Schema schema) {
+    SchemaTreeNode child = _children.get(key);
+    if (child == null) {
+      child = new SchemaTreeNode(key, getJsonKeyPath(), schema);
+      _children.put(key, child);
+    }
+    return child;
+  }
+
+  public SchemaTreeNode getChild(String key) {
+    return _children.get(key);
+  }
+
+  public String getKeyName() {
+    return _keyName;
+  }
+
+  public String getColumnName() {
+    return _columnName;
+  }
+
+  public Object getValue(Object value) {
+    // In {#link DataTypeTransformer}, for a field type as SingleValueField, 
it does not allow the input value as a
+    // collection or array. To prevent the error, we serialize the value to a 
string if the field is a string type.
+    if (_fieldSpec != null && _fieldSpec.getDataType() == DataType.STRING && 
_fieldSpec.isSingleValueField()) {
+      try {
+        if (value instanceof Collection) {
+          return JsonUtils.objectToString(value);
+        }
+        if (value instanceof Object[]) {
+          return JsonUtils.objectToString(Arrays.asList((Object[]) value));
+        }
+      } catch (JsonProcessingException e) {
+        return value.toString();
+      }
+    }
+    return value;
+  }
+
+  public String getJsonKeyPath() {
+    if (_parentPath == null || _parentPath.isEmpty()) {
+      return _keyName;
+    }
+    return _parentPath + JsonUtils.KEY_SEPARATOR + _keyName;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/Base64Utils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/Base64Utils.java
new file mode 100644
index 0000000000..12a1652a24
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/Base64Utils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import org.apache.commons.codec.binary.Base64;
+
+/**
+ * Simple wrapper class over codec's Base64 implementation to handle 
Pinot-specific Base64 encoded binary data.
+ */
+public class Base64Utils extends Base64 {
+  public static boolean isBase64IgnoreWhiteSpace(byte[] arrayOctet) {
+    return isBase64(arrayOctet);
+  }
+
+  public static boolean isBase64IgnoreTrailingPeriods(byte[] arrayOctet) {
+    int i = arrayOctet.length - 1;
+    while (i >= 0 && '.' == arrayOctet[i]) {
+      --i;
+    }
+    while (i >= 0) {
+      if (!isBase64(arrayOctet[i])) {
+        return false;
+      }
+      --i;
+    }
+    return true;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 203d7d930a..556b025855 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -310,7 +310,8 @@ public final class IngestionUtils {
   public static Set<String> getFieldsForRecordExtractor(@Nullable 
IngestionConfig ingestionConfig, Schema schema) {
     Set<String> fieldsForRecordExtractor = new HashSet<>();
 
-    if (null != ingestionConfig && null != 
ingestionConfig.getSchemaConformingTransformerConfig()) {
+    if (null != ingestionConfig && (null != 
ingestionConfig.getSchemaConformingTransformerConfig()
+        || null != ingestionConfig.getSchemaConformingTransformerV2Config())) {
       // The SchemaConformingTransformer requires that all fields are 
extracted, indicated by returning an empty set
       // here. Compared to extracting the fields specified below, extracting 
all fields should be a superset.
       return fieldsForRecordExtractor;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 8d31f5d399..14c4040a60 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -46,6 +46,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
 import 
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
+import 
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformerV2;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
@@ -77,6 +78,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -574,6 +576,12 @@ public final class TableConfigUtils {
       if (null != schemaConformingTransformerConfig && null != schema) {
         SchemaConformingTransformer.validateSchema(schema, 
schemaConformingTransformerConfig);
       }
+
+      SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
+          ingestionConfig.getSchemaConformingTransformerV2Config();
+      if (null != schemaConformingTransformerV2Config && null != schema) {
+        SchemaConformingTransformerV2.validateSchema(schema, 
schemaConformingTransformerV2Config);
+      }
     }
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
new file mode 100644
index 0000000000..6189f14d42
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/SchemaConformingTransformerV2Test.java
@@ -0,0 +1,934 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.NumericNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerV2Config;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+
+public class SchemaConformingTransformerV2Test {
+  private static final String INDEXABLE_EXTRAS_FIELD_NAME = "json_data";
+  private static final String UNINDEXABLE_EXTRAS_FIELD_NAME = 
"json_data_no_idx";
+  private static final String UNINDEXABLE_FIELD_SUFFIX = "_noIndex";
+  private static final String MERGED_TEXT_INDEX_FIELD_NAME = 
"__mergedTextIndex";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final JsonNodeFactory N = OBJECT_MAPPER.getNodeFactory();
+  private static final String TEST_JSON_ARRAY_FIELD_NAME = "arrayField";
+  private static final String TEST_JSON_NULL_FIELD_NAME = "nullField";
+  private static final String TEST_JSON_STRING_FIELD_NAME = "stringField";
+  private static final String TEST_JSON_MAP_FIELD_NAME = "mapField";
+  private static final String TEST_JSON_MAP_NO_IDX_FIELD_NAME = 
"mapField_noIndex";
+  private static final String TEST_JSON_NESTED_MAP_FIELD_NAME = "nestedFields";
+  private static final String TEST_JSON_INT_NO_IDX_FIELD_NAME = 
"intField_noIndex";
+  private static final String TEST_JSON_STRING_NO_IDX_FIELD_NAME = 
"stringField_noIndex";
+  private static final ArrayNode TEST_JSON_ARRAY_NODE = 
N.arrayNode().add(0).add(1).add(2).add(3);
+  private static final NullNode TEST_JSON_NULL_NODE = N.nullNode();
+  private static final TextNode TEST_JSON_STRING_NODE = N.textNode("a");
+  private static final NumericNode TEST_INT_NODE = N.numberNode(9);
+  private static final TextNode TEST_JSON_STRING_NO_IDX_NODE = N.textNode("z");
+  private static final CustomObjectNode TEST_JSON_MAP_NODE =
+      CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, 
TEST_JSON_ARRAY_NODE)
+          .set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE);
+  private static final CustomObjectNode TEST_JSON_MAP_NO_IDX_NODE =
+      CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+          .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE);
+  private static final CustomObjectNode TEST_JSON_MAP_NODE_WITH_NO_IDX =
+      CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, 
TEST_JSON_ARRAY_NODE)
+          .set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+          .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
+          .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE);
+  static {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+  private static final SchemaConformingTransformerV2 _RECORD_TRANSFORMER =
+      new SchemaConformingTransformerV2(createDefaultBasicTableConfig(), 
createDefaultSchema());
+
+  private static TableConfig createDefaultBasicTableConfig() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
+        new SchemaConformingTransformerV2Config(true, 
INDEXABLE_EXTRAS_FIELD_NAME, true, UNINDEXABLE_EXTRAS_FIELD_NAME,
+            UNINDEXABLE_FIELD_SUFFIX, null, null, null, null, null, null, 
null, null);
+    
ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config);
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
+        .build();
+  }
+
+  private static TableConfig createDefaultTableConfig(String 
indexableExtrasField, String unindexableExtrasField,
+      String unindexableFieldSuffix, Set<String> fieldPathsToDrop, Set<String> 
fieldPathsToPreserve,
+      String mergedTextIndexField) {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    SchemaConformingTransformerV2Config schemaConformingTransformerV2Config =
+        new SchemaConformingTransformerV2Config(indexableExtrasField != null, 
indexableExtrasField,
+            unindexableExtrasField != null, unindexableExtrasField, 
unindexableFieldSuffix, fieldPathsToDrop,
+            fieldPathsToPreserve, mergedTextIndexField, null, null, null, 
null, null);
+    
ingestionConfig.setSchemaConformingTransformerV2Config(schemaConformingTransformerV2Config);
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig)
+        .build();
+  }
+
+  private static Schema createDefaultSchema() {
+    return createDefaultSchemaBuilder().addSingleValueDimension("intField", 
DataType.INT).build();
+  }
+
+  private static Schema.SchemaBuilder createDefaultSchemaBuilder() {
+    return new 
Schema.SchemaBuilder().addSingleValueDimension(INDEXABLE_EXTRAS_FIELD_NAME, 
DataType.JSON)
+        .addSingleValueDimension(UNINDEXABLE_EXTRAS_FIELD_NAME, DataType.JSON);
+  }
+
+  @Test
+  public void testWithNoUnindexableFields() {
+    /*
+    {
+      "arrayField" : [ 0, 1, 2, 3 ],
+      "nullField" : null,
+      "stringField" : "a",
+      "mapField" : {
+        "arrayField" : [ 0, 1, 2, 3 ],
+        "nullField" : null,
+        "stringField" : "a"
+      },
+      "nestedField" : {
+        "arrayField" : [ 0, 1, 2, 3 ],
+        "nullField" : null,
+        "stringField" : "a",
+        "mapField" : {
+          "arrayField" : [ 0, 1, 2, 3 ],
+          "nullField" : null,
+          "stringField" : "a"
+        }
+      }
+    }
+    */
+    final CustomObjectNode inputJsonNode =
+        
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_MAP_FIELD_NAME,
 TEST_JSON_MAP_NODE)
+            .set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_MAP_FIELD_NAME,
 TEST_JSON_MAP_NODE));
+
+    CustomObjectNode expectedJsonNode;
+    Schema schema;
+
+    // No dedicated columns, everything moved under INDEXABLE_EXTRAS_FIELD_NAME
+    /*
+    {
+      "json_data" : {
+        "arrayField" : [ 0, 1, 2, 3 ],
+        "nullField" : null,
+        "stringField" : "a",
+        "mapField" : {
+          "arrayField" : [ 0, 1, 2, 3 ],
+          "nullField" : null,
+          "stringField" : "a"
+        },
+        "nestedField" : {
+          "arrayField" : [ 0, 1, 2, 3 ],
+          "nullField" : null,
+          "stringField" : "a",
+          "mapField" : {
+            "arrayField" : [ 0, 1, 2, 3 ],
+            "nullField" : null,
+            "stringField" : "a"
+          }
+        }
+      }
+    }
+    */
+    schema = createDefaultSchemaBuilder().build();
+    expectedJsonNode = 
CustomObjectNode.create().set(INDEXABLE_EXTRAS_FIELD_NAME, inputJsonNode);
+    transformWithIndexableFields(schema, inputJsonNode, expectedJsonNode);
+
+    // Three dedicated columns in schema, only two are populated, one ignored
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nestedFields.stringField":"a",
+      "<indexableExtras>":{
+        "mapField": {
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nullField":null,
+        "stringField":"a",
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      }
+    }
+    */
+    schema = 
createDefaultSchemaBuilder().addMultiValueDimension(TEST_JSON_ARRAY_FIELD_NAME, 
DataType.INT)
+        .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, DataType.STRING)
+        .build();
+    expectedJsonNode = 
CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+
+        .set(INDEXABLE_EXTRAS_FIELD_NAME, 
CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)
+            
.setAll(TEST_JSON_MAP_NODE.deepCopy().removeAndReturn(TEST_JSON_ARRAY_FIELD_NAME))
+            .set(TEST_JSON_NESTED_MAP_FIELD_NAME, CustomObjectNode.create()
+                
.setAll(TEST_JSON_MAP_NODE.deepCopy().removeAndReturn(TEST_JSON_STRING_FIELD_NAME))
+                .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)));
+    transformWithIndexableFields(schema, inputJsonNode, expectedJsonNode);
+
+    // 8 dedicated columns, only 6 are populated
+    /*
+    {
+      "arrayField" : [ 0, 1, 2, 3 ],
+      "nullField" : null,
+      "stringField" : "a",
+      "nestedField.arrayField" : [ 0, 1, 2, 3 ],
+      "nestedField.nullField" : null,
+      "nestedField.stringField" : "a",
+      "json_data" : {
+        "mapField" : {
+          "arrayField" : [ 0, 1, 2, 3 ],
+          "nullField" : null,
+          "stringField" : "a"
+        },
+        "nestedField" : {
+          "mapField" : {
+            "arrayField" : [ 0, 1, 2, 3 ],
+            "nullField" : null,
+            "stringField" : "a"
+          }
+        }
+      }
+    }
+    */
+    schema = 
createDefaultSchemaBuilder().addMultiValueDimension(TEST_JSON_ARRAY_FIELD_NAME, 
DataType.INT)
+        .addSingleValueDimension(TEST_JSON_NULL_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_STRING_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.JSON)
+        .addMultiValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_ARRAY_FIELD_NAME, DataType.INT)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_NULL_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_MAP_FIELD_NAME, DataType.JSON)
+        .build();
+    expectedJsonNode = CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+        .set(INDEXABLE_EXTRAS_FIELD_NAME, 
CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)
+            .set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NODE)));
+    transformWithIndexableFields(schema, inputJsonNode, expectedJsonNode);
+  }
+
+  @Test
+  public void testWithUnindexableFieldsAndMergedTextIndex() {
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "intField_noIndex":9,
+      "string_noIndex":"z",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z"
+      },
+      "mapField_noIndex":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+      },
+      "nestedFields":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        }
+      }
+    }
+    */
+    final CustomObjectNode inputJsonNode =
+        
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME,
 TEST_JSON_ARRAY_NODE)
+            .set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+            .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
+            .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+            .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
+            .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+            
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME,
 TEST_JSON_ARRAY_NODE)
+                .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+                .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+                .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
+                .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NODE_WITH_NO_IDX));
+
+    CustomObjectNode expectedJsonNode;
+    CustomObjectNode expectedJsonNodeWithMergedTextIndex;
+    Schema.SchemaBuilder schemaBuilder;
+
+    // No schema
+    schemaBuilder = createDefaultSchemaBuilder();
+    /*
+    {
+      "indexableExtras":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        },
+        "mapField_noIndex":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z",
+          "mapField":{
+            "intField_noIndex":9,
+            "string_noIndex":"z"
+          }
+        }
+      },
+      __mergedTextIndex: [
+        "[0, 1, 2, 3]:arrayField", "a:stringField",
+        "[0, 1, 2, 3]:mapField.arrayField", "a:mapField.stringField",
+        "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField",
+        "[0, 1, 2, 3]:nestedFields.mapField.arrayField", 
"a:nestedFields.mapField.stringField",
+      ]
+    }
+    */
+    expectedJsonNode = 
CustomObjectNode.create().set(INDEXABLE_EXTRAS_FIELD_NAME,
+        CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, 
TEST_JSON_ARRAY_NODE)
+            .set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+            .set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+            CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, 
TEST_JSON_ARRAY_NODE)
+                .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+                .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+                .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)))
+
+        .set(UNINDEXABLE_EXTRAS_FIELD_NAME,
+            CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                    .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                    .set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NO_IDX_NODE)));
+    transformWithUnIndexableFieldsAndMergedTextIndex(schemaBuilder.build(), 
inputJsonNode, expectedJsonNode);
+
+    expectedJsonNodeWithMergedTextIndex = 
expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME,
+        
N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:mapField.arrayField")
+            
.add("a:mapField.stringField").add("[0,1,2,3]:nestedFields.arrayField").add("a:nestedFields.stringField")
+            
.add("[0,1,2,3]:nestedFields.mapField.arrayField").add("a:nestedFields.mapField.stringField"));
+    transformWithUnIndexableFieldsAndMergedTextIndex(
+        schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, 
DataType.STRING).build(), inputJsonNode,
+        expectedJsonNodeWithMergedTextIndex);
+
+    // With schema, mapField is not indexed
+    schemaBuilder = 
createDefaultSchemaBuilder().addMultiValueDimension("arrayField", DataType.INT)
+        .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME, 
DataType.JSON)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, DataType.STRING);
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nestedFields.stringField":"a",
+      "indexableExtras":{
+        "nullField":null,
+        "stringField":"a",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        },
+        "mapField_noIndex":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z",
+          "mapField":{
+            "intField_noIndex":9,
+            "string_noIndex":"z"
+          }
+        }
+      },
+      __mergedTextIndex: [
+        "[0, 1, 2, 3]:arrayField", "a:stringField",
+        "[0, 1, 2, 3]:mapField.arrayField", "a:mapField.stringField",
+        "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField",
+        "[0, 1, 2, 3]:nestedFields.mapField.arrayField", 
"a:nestedFields.mapField.stringField",
+      ]
+    }
+    */
+    expectedJsonNode = 
CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+        .set(INDEXABLE_EXTRAS_FIELD_NAME, 
CustomObjectNode.create().set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+            .set(TEST_JSON_STRING_FIELD_NAME, 
TEST_JSON_STRING_NODE).set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)
+            .set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, 
TEST_JSON_ARRAY_NODE)
+                    .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+                    .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)))
+
+        .set(UNINDEXABLE_EXTRAS_FIELD_NAME,
+            CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                    .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                    .set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NO_IDX_NODE)));
+    transformWithUnIndexableFieldsAndMergedTextIndex(schemaBuilder.build(), 
inputJsonNode, expectedJsonNode);
+
+    expectedJsonNodeWithMergedTextIndex = 
expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME,
+        
N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:mapField.arrayField")
+            
.add("a:mapField.stringField").add("[0,1,2,3]:nestedFields.arrayField").add("a:nestedFields.stringField")
+            
.add("[0,1,2,3]:nestedFields.mapField.arrayField").add("a:nestedFields.mapField.stringField"));
+    transformWithUnIndexableFieldsAndMergedTextIndex(
+        schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, 
DataType.STRING).build(), inputJsonNode,
+        expectedJsonNodeWithMergedTextIndex);
+
+    // With all fields in schema, but map field would not be indexed
+    schemaBuilder = 
createDefaultSchemaBuilder().addMultiValueDimension(TEST_JSON_ARRAY_FIELD_NAME, 
DataType.INT)
+        .addSingleValueDimension(TEST_JSON_NULL_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_STRING_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.JSON)
+        .addMultiValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_ARRAY_FIELD_NAME, DataType.INT)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_NULL_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_MAP_FIELD_NAME, DataType.JSON);
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "nestedFields.arrayField":[0, 1, 2, 3],
+      "nestedFields.nullField":null,
+      "nestedFields.stringField":"a",
+      "indexableExtras":{
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a"
+        },
+        "nestedFields":{
+          mapField":{
+            "arrayField":[0, 1, 2, 3],
+            "nullField":null,
+            "stringField":"a"
+          }
+        }
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        },
+        "mapField_noIndex":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z",
+          "mapField":{
+            "intField_noIndex":9,
+            "string_noIndex":"z"
+          }
+        }
+      },
+      __mergedTextIndex: [
+        "[0, 1, 2, 3]:arrayField", "a:stringField",
+        "[0, 1, 2, 3]:mapField.arrayField", "a:mapField.stringField",
+        "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField",
+        "[0, 1, 2, 3]:nestedFields.mapField.arrayField", 
"a:nestedFields.mapField.stringField",
+      ]
+    }
+    */
+    expectedJsonNode = 
CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE)
+        .set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_ARRAY_FIELD_NAME, TEST_JSON_ARRAY_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+        .set(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+
+        .set(INDEXABLE_EXTRAS_FIELD_NAME, 
CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE)
+            .set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NODE)))
+
+        .set(UNINDEXABLE_EXTRAS_FIELD_NAME,
+            CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                    .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                    .set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NO_IDX_NODE)));
+    transformWithUnIndexableFieldsAndMergedTextIndex(schemaBuilder.build(), 
inputJsonNode, expectedJsonNode);
+    expectedJsonNodeWithMergedTextIndex = 
expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME,
+        
N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:mapField.arrayField")
+            
.add("a:mapField.stringField").add("[0,1,2,3]:nestedFields.arrayField").add("a:nestedFields.stringField")
+            
.add("[0,1,2,3]:nestedFields.mapField.arrayField").add("a:nestedFields.mapField.stringField"));
+    transformWithUnIndexableFieldsAndMergedTextIndex(
+        schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, 
DataType.STRING).build(), inputJsonNode,
+        expectedJsonNodeWithMergedTextIndex);
+  }
+
+  @Test
+  public void testKeyValueTransformation() {
+    /*
+    {
+      "arrayField":[0, 1, 2, 3],
+      "nullField":null,
+      "stringField":"a",
+      "intField_noIndex":9,
+      "string_noIndex":"z",
+      "mapField":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z"
+      },
+      "mapField_noIndex":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+      },
+      "nestedFields":{
+        "arrayField":[0, 1, 2, 3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        }
+      }
+    }
+    */
+    final CustomObjectNode inputJsonNode =
+        
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME,
 TEST_JSON_ARRAY_NODE)
+            .set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE).set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+            .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
+            .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+            .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
+            .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+            
CustomObjectNode.create().setAll(TEST_JSON_MAP_NODE).set(TEST_JSON_ARRAY_FIELD_NAME,
 TEST_JSON_ARRAY_NODE)
+                .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)
+                .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+                .set(TEST_JSON_INT_NO_IDX_FIELD_NAME, TEST_INT_NODE)
+                .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_FIELD_NAME, 
TEST_JSON_MAP_NODE_WITH_NO_IDX));
+
+    CustomObjectNode expectedJsonNode;
+    CustomObjectNode expectedJsonNodeWithMergedTextIndex;
+    Schema.SchemaBuilder schemaBuilder;
+
+    String destColumnName = "someMeaningfulName";
+    // make array field as single value STRING, test the conversion function
+    // ignore the column nestedFields
+    // preserve the entire mapField value
+    // map the column someMeaningfulName to nestedFields.stringField
+    schemaBuilder = 
createDefaultSchemaBuilder().addSingleValueDimension("arrayField", 
DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_MAP_FIELD_NAME, DataType.STRING)
+        .addSingleValueDimension(TEST_JSON_NESTED_MAP_FIELD_NAME, 
DataType.JSON)
+        .addSingleValueDimension(destColumnName, DataType.STRING);
+
+    Map<String, String> keyMapping = new HashMap<>() {
+      {
+        put(destColumnName, TEST_JSON_NESTED_MAP_FIELD_NAME + "." + 
TEST_JSON_STRING_FIELD_NAME);
+      }
+    };
+    Set<String> pathToDrop = new HashSet<>() {
+      {
+        add(TEST_JSON_NESTED_MAP_FIELD_NAME + "." + TEST_JSON_MAP_FIELD_NAME);
+      }
+    };
+    Set<String> pathToPreserve = new HashSet<>() {
+      {
+        add(TEST_JSON_MAP_FIELD_NAME);
+      }
+    };
+
+    /*
+    {
+      "arrayField":[0,1,2,3],
+      "nestedFields.stringField":"a",
+      "mapField":{
+        "arrayField":[0,1,2,3],
+        "nullField":null,
+        "stringField":"a",
+        "intField_noIndex":9,
+        "string_noIndex":"z"
+      }
+      "indexableExtras":{
+        "nullField":null,
+        "stringField":"a",
+        "nestedFields":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+        }
+      },
+      "unindexableExtras":{
+        "intField_noIndex":9,
+        "string_noIndex":"z",
+        "mapField_noIndex":{
+          "arrayField":[0, 1, 2, 3],
+          "nullField":null,
+          "stringField":"a",
+        },
+        "nestedFields":{
+          "intField_noIndex":9,
+          "string_noIndex":"z"
+        }
+      },
+      __mergedTextIndex: [
+        "[0, 1, 2, 3]:arrayField", "a:stringField",
+        "[0, 1, 2, 3]:nestedFields.arrayField", "a:nestedFields.stringField",
+      ]
+    }
+    */
+    expectedJsonNode = CustomObjectNode.create()
+        .set(TEST_JSON_ARRAY_FIELD_NAME, N.textNode("[0,1,2,3]"))
+        .set(destColumnName, TEST_JSON_STRING_NODE)
+        .set(TEST_JSON_MAP_FIELD_NAME, TEST_JSON_MAP_NODE_WITH_NO_IDX)
+        .set(INDEXABLE_EXTRAS_FIELD_NAME,
+            CustomObjectNode.create().set(TEST_JSON_NULL_FIELD_NAME, 
TEST_JSON_NULL_NODE)
+                .set(TEST_JSON_STRING_FIELD_NAME, TEST_JSON_STRING_NODE)
+                .set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                    CustomObjectNode.create().set(TEST_JSON_ARRAY_FIELD_NAME, 
TEST_JSON_ARRAY_NODE)
+                        .set(TEST_JSON_NULL_FIELD_NAME, TEST_JSON_NULL_NODE)))
+
+        .set(UNINDEXABLE_EXTRAS_FIELD_NAME,
+            CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)
+                .set(TEST_JSON_MAP_NO_IDX_FIELD_NAME, 
TEST_JSON_MAP_NODE).set(TEST_JSON_NESTED_MAP_FIELD_NAME,
+                CustomObjectNode.create().set(TEST_JSON_INT_NO_IDX_FIELD_NAME, 
TEST_INT_NODE)
+                    .set(TEST_JSON_STRING_NO_IDX_FIELD_NAME, 
TEST_JSON_STRING_NO_IDX_NODE)));
+
+    expectedJsonNodeWithMergedTextIndex = 
expectedJsonNode.deepCopy().set(MERGED_TEXT_INDEX_FIELD_NAME,
+        
N.arrayNode().add("[0,1,2,3]:arrayField").add("a:stringField").add("[0,1,2,3]:nestedFields.arrayField").add(
+            "a:nestedFields.stringField"));
+    transformKeyValueTransformation(
+        schemaBuilder.addMultiValueDimension(MERGED_TEXT_INDEX_FIELD_NAME, 
DataType.STRING).build(), keyMapping,
+        pathToDrop, pathToPreserve, inputJsonNode, 
expectedJsonNodeWithMergedTextIndex);
+  }
+
+  private void transformWithIndexableFields(Schema schema, JsonNode 
inputRecordJsonNode, JsonNode ouputRecordJsonNode) {
+    testTransform(INDEXABLE_EXTRAS_FIELD_NAME, null, null, schema, null, null, 
null, inputRecordJsonNode.toString(),
+        ouputRecordJsonNode.toString());
+  }
+
+  private void transformWithUnIndexableFieldsAndMergedTextIndex(Schema schema, 
JsonNode inputRecordJsonNode,
+      JsonNode ouputRecordJsonNode) {
+    testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, 
MERGED_TEXT_INDEX_FIELD_NAME, schema,
+        null, null, null, inputRecordJsonNode.toString(), 
ouputRecordJsonNode.toString());
+  }
+
+  private void transformKeyValueTransformation(Schema schema, Map<String, 
String> keyMapping,
+      Set<String> fieldPathsToDrop, Set<String> fieldPathsToPreserve, JsonNode 
inputRecordJsonNode,
+      JsonNode ouputRecordJsonNode) {
+    testTransform(INDEXABLE_EXTRAS_FIELD_NAME, UNINDEXABLE_EXTRAS_FIELD_NAME, 
MERGED_TEXT_INDEX_FIELD_NAME, schema,
+        keyMapping, fieldPathsToDrop, fieldPathsToPreserve, 
inputRecordJsonNode.toString(),
+        ouputRecordJsonNode.toString());
+  }
+
+  private void testTransform(String indexableExtrasField, String 
unindexableExtrasField, String mergedTextIndexField,
+      Schema schema, Map<String, String> keyMapping, Set<String> 
fieldPathsToDrop, Set<String> fieldPathsToPreserve,
+      String inputRecordJSONString,
+      String expectedOutputRecordJSONString) {
+    TableConfig tableConfig =
+        createDefaultTableConfig(indexableExtrasField, unindexableExtrasField, 
UNINDEXABLE_FIELD_SUFFIX,
+            fieldPathsToDrop, fieldPathsToPreserve, mergedTextIndexField);
+    
tableConfig.getIngestionConfig().getSchemaConformingTransformerV2Config().setColumnNameToJsonKeyPathMap(keyMapping);
+    GenericRow outputRecord = transformRow(tableConfig, schema, 
inputRecordJSONString);
+    Map<String, Object> expectedOutputRecordMap = 
jsonStringToMap(expectedOutputRecordJSONString);
+
+    // Merged text index field does not need to have deterministic order
+    Object mergedTextIndexValue = 
outputRecord.getFieldToValueMap().get(MERGED_TEXT_INDEX_FIELD_NAME);
+    Object expectedMergedTextIndexValue = 
expectedOutputRecordMap.get(MERGED_TEXT_INDEX_FIELD_NAME);
+    if (mergedTextIndexValue != null) {
+      ((List<Object>) mergedTextIndexValue).sort(null);
+    }
+    if (expectedMergedTextIndexValue != null) {
+      ((List<Object>) expectedMergedTextIndexValue).sort(null);
+    }
+
+    Assert.assertNotNull(outputRecord);
+    Assert.assertEquals(outputRecord.getFieldToValueMap(), 
expectedOutputRecordMap);
+  }
+
+  /**
+   * Transforms the given row (given as a JSON string) using the transformer
+   * @return The transformed row
+   */
+  private GenericRow transformRow(TableConfig tableConfig, Schema schema, 
String inputRecordJSONString) {
+    Map<String, Object> inputRecordMap = 
jsonStringToMap(inputRecordJSONString);
+    GenericRow inputRecord = createRowFromMap(inputRecordMap);
+    SchemaConformingTransformerV2 schemaConformingTransformerV2 =
+        new SchemaConformingTransformerV2(tableConfig, schema);
+    return schemaConformingTransformerV2.transform(inputRecord);
+  }
+
+  /**
+   * @return A map representing the given JSON string
+   */
+  @Nonnull
+  private Map<String, Object> jsonStringToMap(String jsonString) {
+    try {
+      TypeReference<Map<String, Object>> typeRef = new TypeReference<>() {
+      };
+      return OBJECT_MAPPER.readValue(jsonString, typeRef);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+    // Should never reach here
+    return null;
+  }
+
+  /**
+   * @return A new generic row with all the kv-pairs from the given map
+   */
+  private GenericRow createRowFromMap(Map<String, Object> map) {
+    GenericRow record = new GenericRow();
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      record.putValue(entry.getKey(), entry.getValue());
+    }
+    return record;
+  }
+
+  @Test
+  public void testOverlappingSchemaFields() {
+    try {
+      Schema schema = 
createDefaultSchemaBuilder().addSingleValueDimension("a.b", DataType.STRING)
+          .addSingleValueDimension("a.b.c", DataType.INT).build();
+      SchemaConformingTransformerV2.validateSchema(schema,
+          new SchemaConformingTransformerV2Config(null, 
INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null,
+              null, null, null, null, null));
+    } catch (Exception ex) {
+      fail("Should not have thrown any exception when overlapping schema 
occurs");
+    }
+
+    try {
+      // This is a repeat of the previous test but with fields reversed just 
in case they are processed in order
+      Schema schema = 
createDefaultSchemaBuilder().addSingleValueDimension("a.b.c", DataType.INT)
+          .addSingleValueDimension("a.b", DataType.STRING).build();
+      SchemaConformingTransformerV2.validateSchema(schema,
+          new SchemaConformingTransformerV2Config(null, 
INDEXABLE_EXTRAS_FIELD_NAME, null, null, null, null, null, null,
+              null, null, null, null, null));
+    } catch (Exception ex) {
+      fail("Should not have thrown any exception when overlapping schema 
occurs");
+    }
+  }
+
+  @Test
+  public void testBase64ValueFilter() {
+    String text = "Hello world";
+    String binaryData = "ABCxyz12345-_+/=";
+    String binaryDataWithTrailingPeriods = "ABCxyz12345-_+/=..";
+    String binaryDataWithRandomPeriods = "A.BCxy.z12345-_+/=..";
+    String shortBinaryData = "short";
+    int minLength = 10;
+
+    assertFalse(_RECORD_TRANSFORMER.base64ValueFilter(text.getBytes(), 
minLength));
+    assertTrue(_RECORD_TRANSFORMER.base64ValueFilter(binaryData.getBytes(), 
minLength));
+    
assertTrue(_RECORD_TRANSFORMER.base64ValueFilter(binaryDataWithTrailingPeriods.getBytes(),
 minLength));
+    
assertFalse(_RECORD_TRANSFORMER.base64ValueFilter(binaryDataWithRandomPeriods.getBytes(),
 minLength));
+    
assertFalse(_RECORD_TRANSFORMER.base64ValueFilter(shortBinaryData.getBytes(), 
minLength));
+  }
+
+  @Test
+  public void testShingleIndexTokenization() {
+    String key = "key";
+    String value = "0123456789ABCDEFGHIJ";
+    int shingleIndexMaxLength;
+    int shingleIndexOverlapLength;
+    List<String> expectedTokenValues;
+
+    shingleIndexMaxLength = 8;
+    shingleIndexOverlapLength = 1;
+    expectedTokenValues = new ArrayList<>(
+        Arrays.asList("0123:key", "3456:key", "6789:key", "9ABC:key", 
"CDEF:key", "FGHI:key", "IJ:key"));
+    testShingleIndexWithParams(key, value, shingleIndexMaxLength, 
shingleIndexOverlapLength, expectedTokenValues);
+
+    shingleIndexMaxLength = 8;
+    shingleIndexOverlapLength = 2;
+    expectedTokenValues = new ArrayList<>(Arrays
+        .asList("0123:key", "2345:key", "4567:key", "6789:key", "89AB:key", 
"ABCD:key", "CDEF:key", "EFGH:key",
+            "GHIJ:key"));
+    testShingleIndexWithParams(key, value, shingleIndexMaxLength, 
shingleIndexOverlapLength, expectedTokenValues);
+
+    // If shingleIndexMaxLength is lower than the minimum required length for 
merged text index token
+    // (length of the key + shingling overlap length + 1), then the 
shingleIndexMaxLength is adjusted to
+    // the maximum Lucene token size (32766)
+    shingleIndexMaxLength = 1;
+    shingleIndexOverlapLength = 5;
+    expectedTokenValues = new ArrayList<>(Arrays.asList(value + ":" + key));
+    testShingleIndexWithParams(key, value, shingleIndexMaxLength, 
shingleIndexOverlapLength, expectedTokenValues);
+
+    // If shingleIndexOverlapLength is equal to or longer than the length of 
the value, shingling cannot be applied and
+    // only one token is generated.
+    shingleIndexMaxLength = 32766;
+    shingleIndexOverlapLength = 100;
+    expectedTokenValues = new ArrayList<>(Arrays.asList(value + ":" + key));
+    testShingleIndexWithParams(key, value, shingleIndexMaxLength, 
shingleIndexOverlapLength, expectedTokenValues);
+
+    // Other corner cases, where the result would be the same as if shingling 
has not been applied
+    shingleIndexMaxLength = 300;
+    shingleIndexOverlapLength = 10;
+    expectedTokenValues = new ArrayList<>(Arrays.asList(value + ":" + key));
+    testShingleIndexWithParams(key, value, shingleIndexMaxLength, 
shingleIndexOverlapLength, expectedTokenValues);
+  }
+
+  private void testShingleIndexWithParams(String key, String value, Integer 
shingleIndexMaxLength,
+      Integer shingleIndexOverlapLength, List<String> expectedTokenValues) {
+    Map.Entry<String, Object> kv = new AbstractMap.SimpleEntry<>(key, value);
+    List<String> shingleIndexTokens = new ArrayList<>();
+    _RECORD_TRANSFORMER
+        .generateShingleTextIndexDocument(kv, shingleIndexTokens, 
shingleIndexMaxLength, shingleIndexOverlapLength);
+    int numTokens = shingleIndexTokens.size();
+    assertEquals(numTokens, expectedTokenValues.size());
+    for (int i = 0; i < numTokens; i++) {
+      assertEquals(shingleIndexTokens.get(i), expectedTokenValues.get(i));
+    }
+  }
+
+  static class CustomObjectNode extends ObjectNode {
+    public CustomObjectNode() {
+      super(OBJECT_MAPPER.getNodeFactory());
+    }
+
+    public static CustomObjectNode create() {
+      return new CustomObjectNode();
+    }
+
+    public CustomObjectNode set(String fieldName, JsonNode value) {
+      super.set(fieldName, value);
+      return this;
+    }
+
+    public CustomObjectNode setAll(ObjectNode other) {
+      super.setAll(other);
+      return this;
+    }
+
+    public CustomObjectNode removeAndReturn(String fieldName) {
+      super.remove(fieldName);
+      return this;
+    }
+
+    public CustomObjectNode deepCopy() {
+      return CustomObjectNode.create().setAll(this);
+    }
+  }
+
+  static {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/Base64UtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/Base64UtilsTest.java
new file mode 100644
index 0000000000..e067321404
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/Base64UtilsTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.nio.charset.StandardCharsets;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class Base64UtilsTest {
+  private static final String SPECIAL_CHARS = "+/=-_";
+  private static final String UPPER_CASE_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+  private static final String LOWER_CASE_CHARS = "abcdefghijklmnopqrstuvwxyz";
+  private static final String NUMBER_CHARS = "0123456789";
+  private static final String[] BASE64_STRINGS = {
+      UPPER_CASE_CHARS,
+      LOWER_CASE_CHARS,
+      SPECIAL_CHARS,
+      NUMBER_CHARS,
+      SPECIAL_CHARS + NUMBER_CHARS + LOWER_CASE_CHARS + UPPER_CASE_CHARS,
+      UPPER_CASE_CHARS + SPECIAL_CHARS + LOWER_CASE_CHARS + NUMBER_CHARS
+  };
+  private static final String[] BASE64_STRINGS_WITH_WHITE_SPACE = {
+      SPECIAL_CHARS + "\n" + NUMBER_CHARS + "\t" + LOWER_CASE_CHARS + " " + 
UPPER_CASE_CHARS,
+      UPPER_CASE_CHARS + "\n" + SPECIAL_CHARS + "\t" + LOWER_CASE_CHARS + " " 
+ NUMBER_CHARS
+  };
+  private static final String[] NON_BASE64_STRINGS = {
+      UPPER_CASE_CHARS + "!",
+      LOWER_CASE_CHARS + "@",
+      SPECIAL_CHARS + "#",
+      NUMBER_CHARS + "$",
+      SPECIAL_CHARS + ".." + NUMBER_CHARS + "?" + LOWER_CASE_CHARS + "^" + 
UPPER_CASE_CHARS + "*"
+  };
+
+  @Test
+  public void testBase64IgnoreWhiteSpace() {
+    for (final String s : BASE64_STRINGS) {
+      
assertTrue(Base64Utils.isBase64IgnoreWhiteSpace(s.getBytes(StandardCharsets.UTF_8)));
+      assertFalse(Base64Utils.isBase64IgnoreWhiteSpace((s + 
"..").getBytes(StandardCharsets.UTF_8)));
+    }
+
+    for (final String s : BASE64_STRINGS_WITH_WHITE_SPACE) {
+      
assertTrue(Base64Utils.isBase64IgnoreWhiteSpace(s.getBytes(StandardCharsets.UTF_8)));
+      assertFalse(Base64Utils.isBase64IgnoreWhiteSpace((s + 
"..").getBytes(StandardCharsets.UTF_8)));
+    }
+
+    for (final String s : NON_BASE64_STRINGS) {
+      
assertFalse(Base64Utils.isBase64IgnoreWhiteSpace(s.getBytes(StandardCharsets.UTF_8)));
+      assertFalse(Base64Utils.isBase64IgnoreWhiteSpace((s + 
"..").getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+
+  @Test
+  public void testBase64IgnoreTrailingPeriods() {
+    for (final String s : BASE64_STRINGS) {
+      String testStr = s;
+      for (int i = 0; i < 10; i++) {
+        
assertTrue(Base64Utils.isBase64IgnoreTrailingPeriods(testStr.getBytes(StandardCharsets.UTF_8)));
+        testStr = testStr + ".";
+      }
+    }
+
+    for (final String s : BASE64_STRINGS_WITH_WHITE_SPACE) {
+      String testStr = s;
+      for (int i = 0; i < 2; i++) {
+        
assertFalse(Base64Utils.isBase64IgnoreTrailingPeriods(testStr.getBytes(StandardCharsets.UTF_8)));
+        testStr = testStr + ".";
+      }
+    }
+
+    for (final String s : NON_BASE64_STRINGS) {
+      String testStr = s;
+      for (int i = 0; i < 2; i++) {
+        
assertFalse(Base64Utils.isBase64IgnoreTrailingPeriods(testStr.getBytes(StandardCharsets.UTF_8)));
+        testStr = testStr + ".";
+      }
+    }
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 5af9cdcc50..358cf35a43 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -51,6 +51,9 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Config related to the SchemaConformingTransformer")
   private SchemaConformingTransformerConfig _schemaConformingTransformerConfig;
 
+  @JsonPropertyDescription("Config related to the 
SchemaConformingTransformerV2")
+  private SchemaConformingTransformerV2Config 
_schemaConformingTransformerV2Config;
+
   @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
   private List<AggregationConfig> _aggregationConfigs;
 
@@ -69,6 +72,7 @@ public class IngestionConfig extends BaseJsonConfig {
       @Nullable List<EnrichmentConfig> enrichmentConfigs,
       @Nullable List<TransformConfig> transformConfigs, @Nullable 
ComplexTypeConfig complexTypeConfig,
       @Nullable SchemaConformingTransformerConfig 
schemaConformingTransformerConfig,
+      @Nullable SchemaConformingTransformerV2Config 
schemaConformingTransformerV2Config,
       @Nullable List<AggregationConfig> aggregationConfigs) {
     _batchIngestionConfig = batchIngestionConfig;
     _streamIngestionConfig = streamIngestionConfig;
@@ -77,6 +81,7 @@ public class IngestionConfig extends BaseJsonConfig {
     _transformConfigs = transformConfigs;
     _complexTypeConfig = complexTypeConfig;
     _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
+    _schemaConformingTransformerV2Config = schemaConformingTransformerV2Config;
     _aggregationConfigs = aggregationConfigs;
   }
 
@@ -118,6 +123,11 @@ public class IngestionConfig extends BaseJsonConfig {
     return _schemaConformingTransformerConfig;
   }
 
+  @Nullable
+  public SchemaConformingTransformerV2Config 
getSchemaConformingTransformerV2Config() {
+    return _schemaConformingTransformerV2Config;
+  }
+
   @Nullable
   public List<AggregationConfig> getAggregationConfigs() {
     return _aggregationConfigs;
@@ -164,6 +174,11 @@ public class IngestionConfig extends BaseJsonConfig {
     _schemaConformingTransformerConfig = schemaConformingTransformerConfig;
   }
 
+  public void setSchemaConformingTransformerV2Config(
+      SchemaConformingTransformerV2Config schemaConformingTransformerV2Config) 
{
+    _schemaConformingTransformerV2Config = schemaConformingTransformerV2Config;
+  }
+
   public void setAggregationConfigs(List<AggregationConfig> 
aggregationConfigs) {
     _aggregationConfigs = aggregationConfigs;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
index 96231f39d9..e51eb65e4a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerConfig.java
@@ -31,8 +31,8 @@ public class SchemaConformingTransformerConfig extends 
BaseJsonConfig {
   @JsonPropertyDescription("Name of the field that should contain extra fields 
that are not part of the schema.")
   private final String _indexableExtrasField;
 
-  @JsonPropertyDescription(
-      "Like indexableExtrasField except it only contains fields with the 
suffix in unindexableFieldSuffix.")
+  @JsonPropertyDescription("Like indexableExtrasField except it only contains 
fields with the suffix in "
+      + "unindexableFieldSuffix.")
   private final String _unindexableExtrasField;
 
   @JsonPropertyDescription("The suffix of fields that must be stored in 
unindexableExtrasField")
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
new file mode 100644
index 0000000000..1d58d76f81
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/SchemaConformingTransformerV2Config.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+public class SchemaConformingTransformerV2Config extends BaseJsonConfig {
+  @JsonPropertyDescription("Enable indexable extras")
+  private boolean _enableIndexableExtras = true;
+
+  @JsonPropertyDescription("Name of the field that should contain extra fields 
that are not part of the schema.")
+  private String _indexableExtrasField = "json_data";
+
+  @JsonPropertyDescription("Enable unindexable extras")
+  private boolean _enableUnindexableExtras = true;
+
+  @JsonPropertyDescription(
+      "Like indexableExtrasField except it only contains fields with the 
suffix in unindexableFieldSuffix.")
+  private String _unindexableExtrasField = "json_data_no_idx";
+
+  @JsonPropertyDescription("The suffix of fields that must be stored in 
unindexableExtrasField")
+  private String _unindexableFieldSuffix = "_noindex";
+
+  @JsonPropertyDescription("Array of flattened (dot-delimited) object paths to 
drop")
+  private Set<String> _fieldPathsToDrop = new HashSet<>();
+
+  @JsonPropertyDescription("Array of flattened (dot-delimited) object paths 
not to traverse further and keep same as "
+      + "input. This will also skip building mergedTextIndex for the field.")
+  private Set<String> _fieldPathsToPreserveInput = new HashSet<>();
+
+  @JsonPropertyDescription("Map from customized meaningful column name to json 
key path")
+  private Map<String, String> _columnNameToJsonKeyPathMap = new HashMap<>();
+
+  @JsonPropertyDescription("mergedTextIndex field")
+  private String _mergedTextIndexField = "__mergedTextIndex";
+
+  @JsonPropertyDescription("mergedTextIndex document max length")
+  private int _mergedTextIndexDocumentMaxLength = 32766;
+
+  @JsonPropertyDescription(
+      "Recall that merged text index document is in the format of <value:key>. 
"
+          + "The mergedTextIndex shingling overlap length refers to the "
+          + "maximum search length of the value that will yield results with "
+          + "100% accuracy. If the value is null, shingle index will be turned 
off "
+          + "and the value will be truncated such that the document is equal 
to "
+          + "_mergedTextIndexDocumentMaxLength"
+  )
+  private @Nullable Integer _mergedTextIndexShinglingOverlapLength = null;
+
+  @JsonPropertyDescription("mergedTextIndex binary document detection minimum 
length")
+  private Integer _mergedTextIndexBinaryDocumentDetectionMinLength = 512;
+
+  @JsonPropertyDescription("Array of paths to exclude from merged text index.")
+  private Set<String> _mergedTextIndexPathToExclude = new HashSet<>();
+
+  // TODO: set default value from CLPRewriter once it open sourced
+  @JsonPropertyDescription("Array of suffix to exclude from merged text 
index.")
+  private List<String> _mergedTextIndexSuffixToExclude = 
Arrays.asList("_logtype", "_dictionaryVars", "_encodedVars");
+
+  @JsonPropertyDescription("Dedicated fields to double ingest into json_data 
column")
+  private Set<String> _fieldsToDoubleIngest = new HashSet<>();
+
+  @JsonCreator
+  public SchemaConformingTransformerV2Config(
+      @JsonProperty("enableIndexableExtras") @Nullable Boolean 
enableIndexableExtras,
+      @JsonProperty("indexableExtrasField") String indexableExtrasField,
+      @JsonProperty("enableUnindexableExtras") @Nullable Boolean 
enableUnindexableExtras,
+      @JsonProperty("unindexableExtrasField") @Nullable String 
unindexableExtrasField,
+      @JsonProperty("unindexableFieldSuffix") @Nullable String 
unindexableFieldSuffix,
+      @JsonProperty("fieldPathsToDrop") @Nullable Set<String> fieldPathsToDrop,
+      @JsonProperty("fieldPathsToKeepSameAsInput") @Nullable Set<String> 
fieldPathsToPreserveInput,
+      @JsonProperty("mergedTextIndexField") @Nullable String 
mergedTextIndexField,
+      @JsonProperty("mergedTextIndexDocumentMaxLength") @Nullable Integer 
mergedTextIndexDocumentMaxLength,
+      @JsonProperty("mergedTextIndexShinglingOverlapLength") @Nullable Integer 
mergedTextIndexShinglingOverlapLength,
+      @JsonProperty("mergedTextIndexBinaryDocumentDetectionMinLength")
+      @Nullable Integer mergedTextIndexBinaryDocumentDetectionMinLength,
+      @JsonProperty("mergedTextIndexPathToExclude") @Nullable Set<String> 
mergedTextIndexPathToExclude,
+      @JsonProperty("fieldsToDoubleIngest") @Nullable Set<String> 
fieldsToDoubleIngest
+  ) {
+    setEnableIndexableExtras(enableIndexableExtras);
+    setIndexableExtrasField(indexableExtrasField);
+    setEnableUnindexableExtras(enableUnindexableExtras);
+    setUnindexableExtrasField(unindexableExtrasField);
+    setUnindexableFieldSuffix(unindexableFieldSuffix);
+    setFieldPathsToDrop(fieldPathsToDrop);
+    setFieldPathsToPreserveInput(fieldPathsToPreserveInput);
+
+    setMergedTextIndexField(mergedTextIndexField);
+    setMergedTextIndexDocumentMaxLength(mergedTextIndexDocumentMaxLength);
+    
setMergedTextIndexShinglingDocumentOverlapLength(mergedTextIndexShinglingOverlapLength);
+    
setMergedTextIndexBinaryDocumentDetectionMinLength(mergedTextIndexBinaryDocumentDetectionMinLength);
+    setMergedTextIndexPathToExclude(mergedTextIndexPathToExclude);
+    setFieldsToDoubleIngest(fieldsToDoubleIngest);
+  }
+
+  public SchemaConformingTransformerV2Config setEnableIndexableExtras(Boolean 
enableIndexableExtras) {
+    _enableIndexableExtras = enableIndexableExtras == null ? 
_enableUnindexableExtras : enableIndexableExtras;
+    return this;
+  }
+
+  public String getIndexableExtrasField() {
+    return _enableIndexableExtras ? _indexableExtrasField : null;
+  }
+
+  public SchemaConformingTransformerV2Config setIndexableExtrasField(String 
indexableExtrasField) {
+    _indexableExtrasField = indexableExtrasField == null ? 
_indexableExtrasField : indexableExtrasField;
+    return this;
+  }
+
+  public SchemaConformingTransformerV2Config 
setEnableUnindexableExtras(Boolean enableUnindexableExtras) {
+    _enableUnindexableExtras = enableUnindexableExtras == null ? 
_enableUnindexableExtras : enableUnindexableExtras;
+    return this;
+  }
+
+  public String getUnindexableExtrasField() {
+    return _enableUnindexableExtras ? _unindexableExtrasField : null;
+  }
+
+  public SchemaConformingTransformerV2Config setUnindexableExtrasField(String 
unindexableExtrasField) {
+    _unindexableExtrasField = unindexableExtrasField == null ? 
_unindexableExtrasField : unindexableExtrasField;
+    return this;
+  }
+
+  public String getUnindexableFieldSuffix() {
+    return _unindexableFieldSuffix;
+  }
+
+  public SchemaConformingTransformerV2Config setUnindexableFieldSuffix(String 
unindexableFieldSuffix) {
+    _unindexableFieldSuffix = unindexableFieldSuffix == null ? 
_unindexableFieldSuffix : unindexableFieldSuffix;
+    return this;
+  }
+
+  public Set<String> getFieldPathsToDrop() {
+    return _fieldPathsToDrop;
+  }
+
+  public SchemaConformingTransformerV2Config setFieldPathsToDrop(Set<String> 
fieldPathsToDrop) {
+    _fieldPathsToDrop = fieldPathsToDrop == null ? _fieldPathsToDrop : 
fieldPathsToDrop;
+    return this;
+  }
+
+  public Set<String> getFieldPathsToPreserveInput() {
+    return _fieldPathsToPreserveInput;
+  }
+
+  public SchemaConformingTransformerV2Config 
setFieldPathsToPreserveInput(Set<String> fieldPathsToPreserveInput) {
+    _fieldPathsToPreserveInput = fieldPathsToPreserveInput == null ? 
_fieldPathsToPreserveInput
+        : fieldPathsToPreserveInput;
+    return this;
+  }
+
+  public Map<String, String> getColumnNameToJsonKeyPathMap() {
+    return _columnNameToJsonKeyPathMap;
+  }
+
+  public SchemaConformingTransformerV2Config setColumnNameToJsonKeyPathMap(
+      Map<String, String> columnNameToJsonKeyPathMap) {
+    _columnNameToJsonKeyPathMap = columnNameToJsonKeyPathMap == null
+        ? _columnNameToJsonKeyPathMap : columnNameToJsonKeyPathMap;
+    return this;
+  }
+
+  public String getMergedTextIndexField() {
+    return _mergedTextIndexField;
+  }
+
+  public SchemaConformingTransformerV2Config setMergedTextIndexField(String 
mergedTextIndexField) {
+    _mergedTextIndexField = mergedTextIndexField == null ? 
_mergedTextIndexField : mergedTextIndexField;
+    return this;
+  }
+
+  public Integer getMergedTextIndexDocumentMaxLength() {
+    return _mergedTextIndexDocumentMaxLength;
+  }
+
+  public SchemaConformingTransformerV2Config 
setMergedTextIndexDocumentMaxLength(
+      Integer mergedTextIndexDocumentMaxLength
+  ) {
+    _mergedTextIndexDocumentMaxLength = mergedTextIndexDocumentMaxLength == 
null
+        ? _mergedTextIndexDocumentMaxLength : mergedTextIndexDocumentMaxLength;
+    return this;
+  }
+
+  public Integer getMergedTextIndexShinglingOverlapLength() {
+    return _mergedTextIndexShinglingOverlapLength;
+  }
+
+  public SchemaConformingTransformerV2Config 
setMergedTextIndexShinglingDocumentOverlapLength(
+      Integer mergedTextIndexShinglingOverlapLength) {
+    _mergedTextIndexShinglingOverlapLength = 
mergedTextIndexShinglingOverlapLength;
+    return this;
+  }
+
+  public Integer getMergedTextIndexBinaryDocumentDetectionMinLength() {
+    return _mergedTextIndexBinaryDocumentDetectionMinLength;
+  }
+
+  public SchemaConformingTransformerV2Config 
setMergedTextIndexBinaryDocumentDetectionMinLength(
+      Integer mergedTextIndexBinaryDocumentDetectionMinLength) {
+    _mergedTextIndexBinaryDocumentDetectionMinLength = 
mergedTextIndexBinaryDocumentDetectionMinLength == null
+        ? _mergedTextIndexBinaryDocumentDetectionMinLength : 
mergedTextIndexBinaryDocumentDetectionMinLength;
+    return this;
+  }
+
+  public Set<String> getMergedTextIndexPathToExclude() {
+    return _mergedTextIndexPathToExclude;
+  }
+
+  public List<String> getMergedTextIndexSuffixToExclude() {
+    return _mergedTextIndexSuffixToExclude;
+  }
+
+  public SchemaConformingTransformerV2Config 
setMergedTextIndexPathToExclude(Set<String> mergedTextIndexPathToExclude) {
+    _mergedTextIndexPathToExclude = mergedTextIndexPathToExclude == null
+        ? _mergedTextIndexPathToExclude : mergedTextIndexPathToExclude;
+    return this;
+  }
+
+  public Set<String> getFieldsToDoubleIngest() {
+    return _fieldsToDoubleIngest;
+  }
+
+  public SchemaConformingTransformerV2Config 
setFieldsToDoubleIngest(Set<String> fieldsToDoubleIngest) {
+    _fieldsToDoubleIngest = fieldsToDoubleIngest == null ? 
_fieldsToDoubleIngest : fieldsToDoubleIngest;
+    return this;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to