This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9822100fc0 Add config to skip record ingestion on string column length 
exceeding configured max schema length (#13103)
9822100fc0 is described below

commit 9822100fc0dd5c6ddb6d5463c7b76d16a0e4cd1e
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Tue Jun 4 22:44:49 2024 +0530

    Add config to skip record ingestion on string column length exceeding 
configured max schema length (#13103)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../apache/pinot/common/data/FieldSpecTest.java    |   3 +-
 .../realtime/RealtimeSegmentDataManager.java       |   6 +
 .../recordtransformer/SanitizationTransformer.java | 190 ++++++++++++++++---
 .../local/segment/creator/TransformPipeline.java   |  12 ++
 .../recordtransformer/RecordTransformerTest.java   | 209 ++++++++++++++++++++-
 .../apache/pinot/spi/data/DimensionFieldSpec.java  |   8 +-
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  27 +++
 .../org/apache/pinot/spi/data/MetricFieldSpec.java |   2 +-
 .../apache/pinot/spi/data/readers/GenericRow.java  |   2 +
 10 files changed, 434 insertions(+), 26 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index a51368a33f..516584950d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -35,6 +35,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   DELETED_SEGMENT_COUNT("segments", false),
   DELETE_TABLE_FAILURES("tables", false),
   REALTIME_ROWS_CONSUMED("rows", true),
+  REALTIME_ROWS_SANITIZED("rows", true),
   REALTIME_ROWS_FETCHED("rows", false),
   REALTIME_ROWS_FILTERED("rows", false),
   INVALID_REALTIME_ROWS_DROPPED("rows", false),
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
index 91641e1afa..98f03b2ed9 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
@@ -312,10 +312,11 @@ public class FieldSpecTest {
     // Multi-value dimension field with default null value.
     dimensionFields = new String[]{
         "\"name\":\"dimension\"", "\"dataType\":\"STRING\"", 
"\"singleValueField\":false",
-        "\"defaultNullValue\":\"default\""
+        "\"defaultNullValue\":\"default\", \"maxLengthExceedStrategy\": 
\"TRIM_LENGTH\""
     };
     dimensionFieldSpec1 = 
JsonUtils.stringToObject(getRandomOrderJsonString(dimensionFields), 
DimensionFieldSpec.class);
     dimensionFieldSpec2 = new DimensionFieldSpec("dimension", STRING, false, 
"default");
+    
dimensionFieldSpec2.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH);
     Assert.assertEquals(dimensionFieldSpec1, dimensionFieldSpec2, 
ERROR_MESSAGE);
     Assert.assertEquals(dimensionFieldSpec1.getDefaultNullValue(), "default", 
ERROR_MESSAGE);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index bcc3223141..f50942c588 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -540,6 +540,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     PinotMeter realtimeRowsConsumedMeter = null;
     PinotMeter realtimeRowsDroppedMeter = null;
     PinotMeter realtimeIncompleteRowsConsumedMeter = null;
+    PinotMeter realtimeRowsSanitizedMeter = null;
 
     int indexedMessageCount = 0;
     int streamMessageCount = 0;
@@ -625,6 +626,11 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
                   reusedResult.getIncompleteRowCount(), 
realtimeIncompleteRowsConsumedMeter);
         }
+        if (reusedResult.getSanitizedRowCount() > 0) {
+          realtimeRowsSanitizedMeter =
+              _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_SANITIZED,
+                  reusedResult.getSanitizedRowCount(), 
realtimeRowsSanitizedMeter);
+        }
         List<GenericRow> transformedRows = reusedResult.getTransformedRows();
         for (GenericRow transformedRow : transformedRows) {
           try {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
index beb8098fba..eb407fbfc2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java
@@ -18,10 +18,11 @@
  */
 package org.apache.pinot.segment.local.recordtransformer;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.StringUtil;
@@ -32,51 +33,196 @@ import org.apache.pinot.spi.utils.StringUtil;
  * <ul>
  *   <li>No {@code null} characters in string values</li>
  *   <li>String values are within the length limit</li>
- *   TODO: add length limit to BYTES values if necessary
  * </ul>
  * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all 
values follow the data types in
  * {@link FieldSpec}.
+ * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide 
what to do when the value exceeds the max.
+ * For TRIM_LENGTH, the value is trimmed to the max length.
+ * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null 
value string.
+ * For ERROR, an exception is thrown and the record is skipped.
+ * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else 
trimmed till NULL.
+ * In the first 2 scenarios, this metric REALTIME_ROWS_SANITIZED can be 
tracked to know if a trimmed /
+ * default record was persisted.
+ * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked  to know 
if a record was skipped.
+ * In the last scenario, this metric REALTIME_ROWS_SANITIZED can be tracked to 
know if a record was trimmed
+ * due to having a null character.
  */
 public class SanitizationTransformer implements RecordTransformer {
-  private final Map<String, Integer> _stringColumnMaxLengthMap = new 
HashMap<>();
+  private static final String NULL_CHARACTER = "\0";
+  private final Map<String, SanitizedColumnInfo> _columnToColumnInfoMap = new 
HashMap<>();
 
   public SanitizationTransformer(Schema schema) {
+    FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy;
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-      if (!fieldSpec.isVirtualColumn() && fieldSpec.getDataType() == 
DataType.STRING) {
-        _stringColumnMaxLengthMap.put(fieldSpec.getName(), 
fieldSpec.getMaxLength());
+      if (!fieldSpec.isVirtualColumn()) {
+        if (fieldSpec.getDataType().equals(FieldSpec.DataType.STRING)) {
+          maxLengthExceedStrategy =
+              fieldSpec.getMaxLengthExceedStrategy() == null ? 
FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH
+                  : fieldSpec.getMaxLengthExceedStrategy();
+          _columnToColumnInfoMap.put(fieldSpec.getName(), new 
SanitizedColumnInfo(fieldSpec.getName(),
+              fieldSpec.getMaxLength(), maxLengthExceedStrategy, 
fieldSpec.getDefaultNullValue()));
+        } else if (fieldSpec.getDataType().equals(FieldSpec.DataType.JSON) || 
fieldSpec.getDataType()
+            .equals(FieldSpec.DataType.BYTES)) {
+          maxLengthExceedStrategy = fieldSpec.getMaxLengthExceedStrategy() == 
null
+              ? FieldSpec.MaxLengthExceedStrategy.NO_ACTION : 
fieldSpec.getMaxLengthExceedStrategy();
+          if 
(maxLengthExceedStrategy.equals(FieldSpec.MaxLengthExceedStrategy.NO_ACTION)) {
+            continue;
+          }
+          _columnToColumnInfoMap.put(fieldSpec.getName(), new 
SanitizedColumnInfo(fieldSpec.getName(),
+              fieldSpec.getMaxLength(), 
fieldSpec.getMaxLengthExceedStrategy(), fieldSpec.getDefaultNullValue()));
+        }
       }
     }
   }
 
   @Override
   public boolean isNoOp() {
-    return _stringColumnMaxLengthMap.isEmpty();
+    return _columnToColumnInfoMap.isEmpty();
   }
 
   @Override
   public GenericRow transform(GenericRow record) {
-    for (Map.Entry<String, Integer> entry : 
_stringColumnMaxLengthMap.entrySet()) {
-      String stringColumn = entry.getKey();
-      int maxLength = entry.getValue();
-      Object value = record.getValue(stringColumn);
-      if (value instanceof String) {
-        // Single-valued column
-        String stringValue = (String) value;
-        String sanitizedValue = StringUtil.sanitizeStringValue(stringValue, 
maxLength);
-        // NOTE: reference comparison
-        //noinspection StringEquality
-        if (sanitizedValue != stringValue) {
-          record.putValue(stringColumn, sanitizedValue);
+    for (Map.Entry<String, SanitizedColumnInfo> entry : 
_columnToColumnInfoMap.entrySet()) {
+      String columnName = entry.getKey();
+      Object value = record.getValue(columnName);
+      Pair<?, Boolean> result;
+      if (value instanceof byte[]) {
+        // Single-values BYTES column
+        result = sanitizeBytesValue(columnName, (byte[]) value, 
entry.getValue());
+        record.putValue(columnName, result.getLeft());
+        if (result.getRight()) {
+          record.putValue(GenericRow.SANITIZED_RECORD_KEY, true);
+        }
+      } else if (value instanceof String) {
+        // Single-valued String column
+        result = sanitizeValue(columnName, (String) value, entry.getValue());
+        record.putValue(columnName, result.getLeft());
+        if (result.getRight()) {
+          record.putValue(GenericRow.SANITIZED_RECORD_KEY, true);
         }
       } else {
-        // Multi-valued column
+        // Multi-valued String / BYTES column
         Object[] values = (Object[]) value;
-        int numValues = values.length;
-        for (int i = 0; i < numValues; i++) {
-          values[i] = StringUtil.sanitizeStringValue(values[i].toString(), 
maxLength);
+        for (int i = 0; i < values.length; i++) {
+          if (values[i] instanceof byte[]) {
+            result = sanitizeBytesValue(columnName, (byte[]) values[i], 
entry.getValue());
+          } else {
+            result = sanitizeValue(columnName, values[i].toString(), 
entry.getValue());
+          }
+          values[i] = result.getLeft();
+          if (result.getRight()) {
+            record.putValue(GenericRow.SANITIZED_RECORD_KEY, true);
+          }
         }
       }
     }
     return record;
   }
+
+  /**
+   * Sanitize the value for the given column.
+   * @param columnName column name
+   * @param value value of the column
+   * @param sanitizedColumnInfo metadata from field spec of the column defined 
in schema
+   * @return the sanitized value and a boolean indicating if the value was 
sanitized
+   */
+  private Pair<String, Boolean> sanitizeValue(String columnName, String value,
+      SanitizedColumnInfo sanitizedColumnInfo) {
+    String sanitizedValue = StringUtil.sanitizeStringValue(value, 
sanitizedColumnInfo.getMaxLength());
+    FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
sanitizedColumnInfo.getMaxLengthExceedStrategy();
+    int index;
+    // NOTE: reference comparison
+    // noinspection StringEquality
+    if (sanitizedValue != value) {
+      switch (maxLengthExceedStrategy) {
+        case TRIM_LENGTH:
+          return Pair.of(sanitizedValue, true);
+        case SUBSTITUTE_DEFAULT_VALUE:
+          return 
Pair.of(FieldSpec.getStringValue(sanitizedColumnInfo.getDefaultNullValue()), 
true);
+        case ERROR:
+          index = value.indexOf(NULL_CHARACTER);
+          if (index < 0) {
+            throw new IllegalStateException(
+                String.format("Throwing exception as value: %s for column %s 
exceeds configured max length %d.", value,
+                    columnName, sanitizedColumnInfo.getMaxLength()));
+          } else {
+            throw new IllegalStateException(
+                String.format("Throwing exception as value: %s for column %s 
contains null character.", value,
+                    columnName));
+          }
+        case NO_ACTION:
+          index = value.indexOf(NULL_CHARACTER);
+          if (index < 0) {
+            return Pair.of(value, false);
+          } else {
+            return Pair.of(sanitizedValue, true);
+          }
+        default:
+          throw new IllegalStateException(
+              "Unsupported max length exceed strategy: " + 
sanitizedColumnInfo.getMaxLengthExceedStrategy());
+      }
+    }
+    return Pair.of(sanitizedValue, false);
+  }
+
+  /**
+   * Sanitize the value for the given column.
+   * @param columnName column name
+   * @param value value of the column
+   * @param sanitizedColumnInfo metadata from field spec of the column defined 
in schema
+   * @return the sanitized value and a boolean indicating if the value was 
sanitized
+   */
+  private Pair<byte[], Boolean> sanitizeBytesValue(String columnName, byte[] 
value,
+      SanitizedColumnInfo sanitizedColumnInfo) {
+    if (value.length > sanitizedColumnInfo.getMaxLength()) {
+      FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
sanitizedColumnInfo.getMaxLengthExceedStrategy();
+      switch (maxLengthExceedStrategy) {
+        case TRIM_LENGTH:
+          return Pair.of(Arrays.copyOf(value, 
sanitizedColumnInfo.getMaxLength()), true);
+        case SUBSTITUTE_DEFAULT_VALUE:
+          return Pair.of((byte[]) sanitizedColumnInfo.getDefaultNullValue(), 
true);
+        case ERROR:
+          throw new IllegalStateException(
+              String.format("Throwing exception as value for column %s exceeds 
configured max length %d.", columnName,
+                  sanitizedColumnInfo.getMaxLength()));
+        case NO_ACTION:
+          return Pair.of(value, false);
+        default:
+          throw new IllegalStateException(
+              "Unsupported max length exceed strategy: " + 
sanitizedColumnInfo.getMaxLengthExceedStrategy());
+      }
+    }
+    return Pair.of(value, false);
+  }
+
+  private static class SanitizedColumnInfo {
+    private final String _columnName;
+    private final int _maxLength;
+    private final FieldSpec.MaxLengthExceedStrategy _maxLengthExceedStrategy;
+    private final Object _defaultNullValue;
+
+    private SanitizedColumnInfo(String columnName, int maxLength,
+        FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy, Object 
defaultNullValue) {
+      _columnName = columnName;
+      _maxLength = maxLength;
+      _maxLengthExceedStrategy = maxLengthExceedStrategy;
+      _defaultNullValue = defaultNullValue;
+    }
+
+    public String getColumnName() {
+      return _columnName;
+    }
+
+    public int getMaxLength() {
+      return _maxLength;
+    }
+
+    public FieldSpec.MaxLengthExceedStrategy getMaxLengthExceedStrategy() {
+      return _maxLengthExceedStrategy;
+    }
+
+    public Object getDefaultNullValue() {
+      return _defaultNullValue;
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index 4ab27ffc46..f594c4d00e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -100,6 +100,9 @@ public class TransformPipeline {
       if 
(Boolean.TRUE.equals(transformedRow.getValue(GenericRow.INCOMPLETE_RECORD_KEY)))
 {
         reusedResult.incIncompleteRowCount();
       }
+      if 
(Boolean.TRUE.equals(transformedRow.getValue(GenericRow.SANITIZED_RECORD_KEY))) 
{
+        reusedResult.incSanitizedRowCount();
+      }
     } else {
       reusedResult.incSkippedRowCount();
     }
@@ -112,6 +115,7 @@ public class TransformPipeline {
     private final List<GenericRow> _transformedRows = new ArrayList<>();
     private int _skippedRowCount = 0;
     private int _incompleteRowCount = 0;
+    private int _sanitizedRowCount = 0;
 
     public List<GenericRow> getTransformedRows() {
       return _transformedRows;
@@ -137,6 +141,14 @@ public class TransformPipeline {
       _incompleteRowCount++;
     }
 
+    public void incSanitizedRowCount() {
+      _sanitizedRowCount++;
+    }
+
+    public int getSanitizedRowCount() {
+      return _sanitizedRowCount;
+    }
+
     public void reset() {
       _skippedRowCount = 0;
       _incompleteRowCount = 0;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index 46a4181a2b..696f2b1f48 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -260,7 +261,9 @@ public class RecordTransformerTest {
   }
 
   @Test
-  public void testSanitationTransformer() {
+  public void testSanitizationTransformer() {
+    // scenario where string contains null and exceeds max length
+    // and fieldSpec maxLengthExceedStrategy is default (TRIM_LENGTH)
     RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
     GenericRow record = getRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
@@ -272,6 +275,210 @@ public class RecordTransformerTest {
       assertEquals(record.getValue("mvString2"), new Object[]{"123", "123", 
"123.0", "123.0", "123"});
       assertNull(record.getValue("$virtual"));
       assertTrue(record.getNullValueFields().isEmpty());
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where string contains null and fieldSpec 
maxLengthExceedStrategy is to ERROR
+    Schema schema = SCHEMA;
+    schema.getFieldSpecFor("svStringWithNullCharacters")
+        .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.ERROR);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      try {
+        record = transformer.transform(record);
+      } catch (Exception e) {
+        assertTrue(e instanceof IllegalStateException);
+        assertEquals(e.getMessage(), "Throwing exception as value: 1\0002\0003 
for column "
+            + "svStringWithNullCharacters contains null character.");
+      }
+    }
+
+    // scenario where string contains null and fieldSpec 
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svStringWithNullCharacters")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svStringWithNullCharacters"), "null");
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where string exceeds max length and fieldSpec 
maxLengthExceedStrategy is to ERROR
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svStringWithLengthLimit")
+        .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.ERROR);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      try {
+        record = transformer.transform(record);
+      } catch (Exception e) {
+        assertTrue(e instanceof IllegalStateException);
+        assertEquals(e.getMessage(), "Throwing exception as value: 123 for 
column svStringWithLengthLimit "
+            + "exceeds configured max length 2.");
+      }
+    }
+
+    // scenario where string exceeds max length and fieldSpec 
maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svStringWithLengthLimit")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svStringWithLengthLimit"), "null");
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where string exceeds max length and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svStringWithLengthLimit")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svStringWithLengthLimit"), "123");
+    }
+
+    // scenario where string contains null and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svStringWithNullCharacters")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svStringWithNullCharacters"), "1");
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svJson").setMaxLength(10);
+    schema.getFieldSpecFor("svJson")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svJson"), "{\"first\": \"daffy\", 
\"last\": \"duck\"}");
+    }
+
+    // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to TRIM_LENGTH
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svJson").setMaxLength(10);
+    schema.getFieldSpecFor("svJson")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svJson"), "{\"first\": ");
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to
+    // SUBSTITUTE_DEFAULT_VALUE
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svJson").setMaxLength(10);
+    schema.getFieldSpecFor("svJson")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svJson"), "null");
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where json field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to
+    // SUBSTITUTE_DEFAULT_VALUE
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svJson").setMaxLength(10);
+    schema.getFieldSpecFor("svJson")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      try {
+        record = transformer.transform(record);
+      } catch (Exception e) {
+        assertTrue(e instanceof IllegalStateException);
+        assertEquals(e.getMessage(), "Throwing exception as value: "
+            + "{\"first\": \"daffy\", \"last\": \"duck\"} for column "
+            + "svJson exceeds configured max length 10.");
+      }
+    }
+
+    // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to NO_ACTION
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svBytes").setMaxLength(2);
+    schema.getFieldSpecFor("svBytes")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svBytes"), "7b7b");
+    }
+
+    // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to TRIM_LENGTH
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svBytes").setMaxLength(2);
+    schema.getFieldSpecFor("svBytes")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svBytes"), "7b");
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to
+    // SUBSTITUTE_DEFAULT_VALUE
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svBytes").setMaxLength(2);
+    schema.getFieldSpecFor("svBytes")
+        
.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svBytes"), BytesUtils.toHexString(new 
byte[0]));
+      
assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY));
+    }
+
+    // scenario where bytes field exceeds max length and fieldSpec 
maxLengthExceedStrategy is to ERROR
+    schema = SCHEMA;
+    schema.getFieldSpecFor("svBytes").setMaxLength(2);
+    schema.getFieldSpecFor("svBytes")
+        .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.ERROR);
+    transformer = new SanitizationTransformer(schema);
+    record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      try {
+        record = transformer.transform(record);
+      } catch (Exception e) {
+        assertTrue(e instanceof IllegalStateException);
+        assertEquals(e.getMessage(), "Throwing exception as value: 7b7b for 
column svBytes "
+            + "exceeds configured max length 2.");
+      }
     }
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java
index 11545a0278..0637abd7fa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java
@@ -45,6 +45,11 @@ public final class DimensionFieldSpec extends FieldSpec {
     super(name, dataType, isSingleValueField, maxLength, defaultNullValue);
   }
 
+  public DimensionFieldSpec(String name, DataType dataType, boolean 
isSingleValueField, int maxLength,
+      @Nullable Object defaultNullValue, @Nullable MaxLengthExceedStrategy 
maxLengthExceedStrategy) {
+    super(name, dataType, isSingleValueField, maxLength, defaultNullValue, 
maxLengthExceedStrategy);
+  }
+
   public DimensionFieldSpec(String name, DataType dataType, boolean 
isSingleValueField,
       Class virtualColumnProviderClass) {
     super(name, dataType, isSingleValueField);
@@ -66,6 +71,7 @@ public final class DimensionFieldSpec extends FieldSpec {
   @Override
   public String toString() {
     return "< field type: DIMENSION, field name: " + _name + ", data type: " + 
_dataType + ", is single-value field: "
-        + _isSingleValueField + ", default null value: " + _defaultNullValue + 
" >";
+        + _isSingleValueField + ", default null value: " + _defaultNullValue + 
", max length exceed strategy: "
+        + _maxLengthExceedStrategy + " >";
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index d5f3f7e403..5fa586b6b9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -46,6 +46,8 @@ import org.apache.pinot.spi.utils.TimestampUtils;
  * <p>- <code>DefaultNullValue</code>: when no value found for this field, use 
this value. Stored in string format.
  * <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use 
for this field.
  * <p>- <code>NotNull</code>: whether the column accepts nulls or not. 
Defaults to false.
+ * <p>- <code>MaxLength</code>: the maximum length of the string column. 
Defaults to 512.
+ * <p>- <code>MaxLengthExceedStrategy</code>: the strategy to handle the case 
when the string column exceeds the max
  */
 @SuppressWarnings("unused")
 public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable 
{
@@ -98,6 +100,10 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     }
   }
 
+  public enum MaxLengthExceedStrategy {
+    TRIM_LENGTH, ERROR, SUBSTITUTE_DEFAULT_VALUE, NO_ACTION
+  }
+
   protected String _name;
   protected DataType _dataType;
   protected boolean _isSingleValueField = true;
@@ -106,6 +112,9 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
   // NOTE: This only applies to STRING column, which is the max number of 
characters
   private int _maxLength = DEFAULT_MAX_LENGTH;
 
+  // NOTE: This only applies to STRING column during {@link 
SanitizationTransformer}
+  protected MaxLengthExceedStrategy _maxLengthExceedStrategy;
+
   protected Object _defaultNullValue;
   private transient String _stringDefaultNullValue;
 
@@ -129,11 +138,17 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
 
   public FieldSpec(String name, DataType dataType, boolean isSingleValueField, 
int maxLength,
       @Nullable Object defaultNullValue) {
+    this(name, dataType, isSingleValueField, maxLength, defaultNullValue, 
null);
+  }
+
+  public FieldSpec(String name, DataType dataType, boolean isSingleValueField, 
int maxLength,
+      @Nullable Object defaultNullValue, @Nullable MaxLengthExceedStrategy 
maxLengthExceedStrategy) {
     _name = name;
     _dataType = dataType;
     _isSingleValueField = isSingleValueField;
     _maxLength = maxLength;
     setDefaultNullValue(defaultNullValue);
+    _maxLengthExceedStrategy = maxLengthExceedStrategy;
   }
 
   public abstract FieldType getFieldType();
@@ -175,6 +190,16 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     _maxLength = maxLength;
   }
 
+  @Nullable
+  public MaxLengthExceedStrategy getMaxLengthExceedStrategy() {
+    return _maxLengthExceedStrategy;
+  }
+
+  // Required by JSON de-serializer. DO NOT REMOVE.
+  public void setMaxLengthExceedStrategy(@Nullable MaxLengthExceedStrategy 
maxLengthExceedStrategy) {
+    _maxLengthExceedStrategy = maxLengthExceedStrategy;
+  }
+
   public String getVirtualColumnProvider() {
     return _virtualColumnProvider;
   }
@@ -411,6 +436,7 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
         .isEqual(_isSingleValueField, that._isSingleValueField) && 
EqualityUtils
         .isEqual(getStringValue(_defaultNullValue), 
getStringValue(that._defaultNullValue)) && EqualityUtils
         .isEqual(_maxLength, that._maxLength) && 
EqualityUtils.isEqual(_transformFunction, that._transformFunction)
+        && EqualityUtils.isEqual(_maxLengthExceedStrategy, 
that._maxLengthExceedStrategy)
         && EqualityUtils.isEqual(_virtualColumnProvider, 
that._virtualColumnProvider)
         && EqualityUtils.isEqual(_notNull, that._notNull);
   }
@@ -422,6 +448,7 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     result = EqualityUtils.hashCodeOf(result, _isSingleValueField);
     result = EqualityUtils.hashCodeOf(result, 
getStringValue(_defaultNullValue));
     result = EqualityUtils.hashCodeOf(result, _maxLength);
+    result = EqualityUtils.hashCodeOf(result, _maxLengthExceedStrategy);
     result = EqualityUtils.hashCodeOf(result, _transformFunction);
     result = EqualityUtils.hashCodeOf(result, _virtualColumnProvider);
     result = EqualityUtils.hashCodeOf(result, _notNull);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java
index f50d851eba..32df61efb7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java
@@ -58,6 +58,6 @@ public final class MetricFieldSpec extends FieldSpec {
   @Override
   public String toString() {
     return "< field type: METRIC, field name: " + _name + ", data type: " + 
_dataType + ", default null value: "
-        + _defaultNullValue + " >";
+        + _defaultNullValue + ", max length exceed strategy: " + 
_maxLengthExceedStrategy + " >";
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 2bc15a0800..e4e46fa4ca 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -75,6 +75,8 @@ public class GenericRow implements Serializable {
    */
   public static final String INCOMPLETE_RECORD_KEY = "$INCOMPLETE_RECORD_KEY$";
 
+  public static final String SANITIZED_RECORD_KEY = "$SANITIZED_RECORD_KEY$";
+
   private final Map<String, Object> _fieldToValueMap = new HashMap<>();
   private final Set<String> _nullValueFields = new HashSet<>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to