This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9822100fc0 Add config to skip record ingestion on string column length exceeding configured max schema length (#13103) 9822100fc0 is described below commit 9822100fc0dd5c6ddb6d5463c7b76d16a0e4cd1e Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Tue Jun 4 22:44:49 2024 +0530 Add config to skip record ingestion on string column length exceeding configured max schema length (#13103) --- .../apache/pinot/common/metrics/ServerMeter.java | 1 + .../apache/pinot/common/data/FieldSpecTest.java | 3 +- .../realtime/RealtimeSegmentDataManager.java | 6 + .../recordtransformer/SanitizationTransformer.java | 190 ++++++++++++++++--- .../local/segment/creator/TransformPipeline.java | 12 ++ .../recordtransformer/RecordTransformerTest.java | 209 ++++++++++++++++++++- .../apache/pinot/spi/data/DimensionFieldSpec.java | 8 +- .../java/org/apache/pinot/spi/data/FieldSpec.java | 27 +++ .../org/apache/pinot/spi/data/MetricFieldSpec.java | 2 +- .../apache/pinot/spi/data/readers/GenericRow.java | 2 + 10 files changed, 434 insertions(+), 26 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index a51368a33f..516584950d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -35,6 +35,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { DELETED_SEGMENT_COUNT("segments", false), DELETE_TABLE_FAILURES("tables", false), REALTIME_ROWS_CONSUMED("rows", true), + REALTIME_ROWS_SANITIZED("rows", true), REALTIME_ROWS_FETCHED("rows", false), REALTIME_ROWS_FILTERED("rows", false), INVALID_REALTIME_ROWS_DROPPED("rows", false), diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java index 91641e1afa..98f03b2ed9 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java @@ -312,10 +312,11 @@ public class FieldSpecTest { // Multi-value dimension field with default null value. dimensionFields = new String[]{ "\"name\":\"dimension\"", "\"dataType\":\"STRING\"", "\"singleValueField\":false", - "\"defaultNullValue\":\"default\"" + "\"defaultNullValue\":\"default\", \"maxLengthExceedStrategy\": \"TRIM_LENGTH\"" }; dimensionFieldSpec1 = JsonUtils.stringToObject(getRandomOrderJsonString(dimensionFields), DimensionFieldSpec.class); dimensionFieldSpec2 = new DimensionFieldSpec("dimension", STRING, false, "default"); + dimensionFieldSpec2.setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH); Assert.assertEquals(dimensionFieldSpec1, dimensionFieldSpec2, ERROR_MESSAGE); Assert.assertEquals(dimensionFieldSpec1.getDefaultNullValue(), "default", ERROR_MESSAGE); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index bcc3223141..f50942c588 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -540,6 +540,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { PinotMeter realtimeRowsConsumedMeter = null; PinotMeter realtimeRowsDroppedMeter = null; PinotMeter realtimeIncompleteRowsConsumedMeter = null; + PinotMeter realtimeRowsSanitizedMeter = null; int indexedMessageCount = 0; int streamMessageCount = 0; @@ -625,6 +626,11 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED, reusedResult.getIncompleteRowCount(), realtimeIncompleteRowsConsumedMeter); } + if (reusedResult.getSanitizedRowCount() > 0) { + realtimeRowsSanitizedMeter = + _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_SANITIZED, + reusedResult.getSanitizedRowCount(), realtimeRowsSanitizedMeter); + } List<GenericRow> transformedRows = reusedResult.getTransformedRows(); for (GenericRow transformedRow : transformedRows) { try { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java index beb8098fba..eb407fbfc2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SanitizationTransformer.java @@ -18,10 +18,11 @@ */ package org.apache.pinot.segment.local.recordtransformer; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.StringUtil; @@ -32,51 +33,196 @@ import org.apache.pinot.spi.utils.StringUtil; * <ul> * <li>No {@code null} characters in string values</li> * <li>String values are within the length limit</li> - * TODO: add length limit to BYTES values if necessary * </ul> * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all values follow the data types in * {@link FieldSpec}. + * This uses the MaxLengthExceedStrategy in the {@link FieldSpec} to decide what to do when the value exceeds the max. + * For TRIM_LENGTH, the value is trimmed to the max length. + * For SUBSTITUTE_DEFAULT_VALUE, the value is replaced with the default null value string. + * For ERROR, an exception is thrown and the record is skipped. + * For NO_ACTION, the value is kept as is if no NULL_CHARACTER present else trimmed till NULL. + * In the first 2 scenarios, this metric REALTIME_ROWS_SANITIZED can be tracked to know if a trimmed / + * default record was persisted. + * In the third scenario, this metric ROWS_WITH_ERRORS can be tracked to know if a record was skipped. + * In the last scenario, this metric REALTIME_ROWS_SANITIZED can be tracked to know if a record was trimmed + * due to having a null character. */ public class SanitizationTransformer implements RecordTransformer { - private final Map<String, Integer> _stringColumnMaxLengthMap = new HashMap<>(); + private static final String NULL_CHARACTER = "\0"; + private final Map<String, SanitizedColumnInfo> _columnToColumnInfoMap = new HashMap<>(); public SanitizationTransformer(Schema schema) { + FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy; for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { - if (!fieldSpec.isVirtualColumn() && fieldSpec.getDataType() == DataType.STRING) { - _stringColumnMaxLengthMap.put(fieldSpec.getName(), fieldSpec.getMaxLength()); + if (!fieldSpec.isVirtualColumn()) { + if (fieldSpec.getDataType().equals(FieldSpec.DataType.STRING)) { + maxLengthExceedStrategy = + fieldSpec.getMaxLengthExceedStrategy() == null ? FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH + : fieldSpec.getMaxLengthExceedStrategy(); + _columnToColumnInfoMap.put(fieldSpec.getName(), new SanitizedColumnInfo(fieldSpec.getName(), + fieldSpec.getMaxLength(), maxLengthExceedStrategy, fieldSpec.getDefaultNullValue())); + } else if (fieldSpec.getDataType().equals(FieldSpec.DataType.JSON) || fieldSpec.getDataType() + .equals(FieldSpec.DataType.BYTES)) { + maxLengthExceedStrategy = fieldSpec.getMaxLengthExceedStrategy() == null + ? FieldSpec.MaxLengthExceedStrategy.NO_ACTION : fieldSpec.getMaxLengthExceedStrategy(); + if (maxLengthExceedStrategy.equals(FieldSpec.MaxLengthExceedStrategy.NO_ACTION)) { + continue; + } + _columnToColumnInfoMap.put(fieldSpec.getName(), new SanitizedColumnInfo(fieldSpec.getName(), + fieldSpec.getMaxLength(), fieldSpec.getMaxLengthExceedStrategy(), fieldSpec.getDefaultNullValue())); + } } } } @Override public boolean isNoOp() { - return _stringColumnMaxLengthMap.isEmpty(); + return _columnToColumnInfoMap.isEmpty(); } @Override public GenericRow transform(GenericRow record) { - for (Map.Entry<String, Integer> entry : _stringColumnMaxLengthMap.entrySet()) { - String stringColumn = entry.getKey(); - int maxLength = entry.getValue(); - Object value = record.getValue(stringColumn); - if (value instanceof String) { - // Single-valued column - String stringValue = (String) value; - String sanitizedValue = StringUtil.sanitizeStringValue(stringValue, maxLength); - // NOTE: reference comparison - //noinspection StringEquality - if (sanitizedValue != stringValue) { - record.putValue(stringColumn, sanitizedValue); + for (Map.Entry<String, SanitizedColumnInfo> entry : _columnToColumnInfoMap.entrySet()) { + String columnName = entry.getKey(); + Object value = record.getValue(columnName); + Pair<?, Boolean> result; + if (value instanceof byte[]) { + // Single-values BYTES column + result = sanitizeBytesValue(columnName, (byte[]) value, entry.getValue()); + record.putValue(columnName, result.getLeft()); + if (result.getRight()) { + record.putValue(GenericRow.SANITIZED_RECORD_KEY, true); + } + } else if (value instanceof String) { + // Single-valued String column + result = sanitizeValue(columnName, (String) value, entry.getValue()); + record.putValue(columnName, result.getLeft()); + if (result.getRight()) { + record.putValue(GenericRow.SANITIZED_RECORD_KEY, true); } } else { - // Multi-valued column + // Multi-valued String / BYTES column Object[] values = (Object[]) value; - int numValues = values.length; - for (int i = 0; i < numValues; i++) { - values[i] = StringUtil.sanitizeStringValue(values[i].toString(), maxLength); + for (int i = 0; i < values.length; i++) { + if (values[i] instanceof byte[]) { + result = sanitizeBytesValue(columnName, (byte[]) values[i], entry.getValue()); + } else { + result = sanitizeValue(columnName, values[i].toString(), entry.getValue()); + } + values[i] = result.getLeft(); + if (result.getRight()) { + record.putValue(GenericRow.SANITIZED_RECORD_KEY, true); + } } } } return record; } + + /** + * Sanitize the value for the given column. + * @param columnName column name + * @param value value of the column + * @param sanitizedColumnInfo metadata from field spec of the column defined in schema + * @return the sanitized value and a boolean indicating if the value was sanitized + */ + private Pair<String, Boolean> sanitizeValue(String columnName, String value, + SanitizedColumnInfo sanitizedColumnInfo) { + String sanitizedValue = StringUtil.sanitizeStringValue(value, sanitizedColumnInfo.getMaxLength()); + FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = sanitizedColumnInfo.getMaxLengthExceedStrategy(); + int index; + // NOTE: reference comparison + // noinspection StringEquality + if (sanitizedValue != value) { + switch (maxLengthExceedStrategy) { + case TRIM_LENGTH: + return Pair.of(sanitizedValue, true); + case SUBSTITUTE_DEFAULT_VALUE: + return Pair.of(FieldSpec.getStringValue(sanitizedColumnInfo.getDefaultNullValue()), true); + case ERROR: + index = value.indexOf(NULL_CHARACTER); + if (index < 0) { + throw new IllegalStateException( + String.format("Throwing exception as value: %s for column %s exceeds configured max length %d.", value, + columnName, sanitizedColumnInfo.getMaxLength())); + } else { + throw new IllegalStateException( + String.format("Throwing exception as value: %s for column %s contains null character.", value, + columnName)); + } + case NO_ACTION: + index = value.indexOf(NULL_CHARACTER); + if (index < 0) { + return Pair.of(value, false); + } else { + return Pair.of(sanitizedValue, true); + } + default: + throw new IllegalStateException( + "Unsupported max length exceed strategy: " + sanitizedColumnInfo.getMaxLengthExceedStrategy()); + } + } + return Pair.of(sanitizedValue, false); + } + + /** + * Sanitize the value for the given column. + * @param columnName column name + * @param value value of the column + * @param sanitizedColumnInfo metadata from field spec of the column defined in schema + * @return the sanitized value and a boolean indicating if the value was sanitized + */ + private Pair<byte[], Boolean> sanitizeBytesValue(String columnName, byte[] value, + SanitizedColumnInfo sanitizedColumnInfo) { + if (value.length > sanitizedColumnInfo.getMaxLength()) { + FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = sanitizedColumnInfo.getMaxLengthExceedStrategy(); + switch (maxLengthExceedStrategy) { + case TRIM_LENGTH: + return Pair.of(Arrays.copyOf(value, sanitizedColumnInfo.getMaxLength()), true); + case SUBSTITUTE_DEFAULT_VALUE: + return Pair.of((byte[]) sanitizedColumnInfo.getDefaultNullValue(), true); + case ERROR: + throw new IllegalStateException( + String.format("Throwing exception as value for column %s exceeds configured max length %d.", columnName, + sanitizedColumnInfo.getMaxLength())); + case NO_ACTION: + return Pair.of(value, false); + default: + throw new IllegalStateException( + "Unsupported max length exceed strategy: " + sanitizedColumnInfo.getMaxLengthExceedStrategy()); + } + } + return Pair.of(value, false); + } + + private static class SanitizedColumnInfo { + private final String _columnName; + private final int _maxLength; + private final FieldSpec.MaxLengthExceedStrategy _maxLengthExceedStrategy; + private final Object _defaultNullValue; + + private SanitizedColumnInfo(String columnName, int maxLength, + FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy, Object defaultNullValue) { + _columnName = columnName; + _maxLength = maxLength; + _maxLengthExceedStrategy = maxLengthExceedStrategy; + _defaultNullValue = defaultNullValue; + } + + public String getColumnName() { + return _columnName; + } + + public int getMaxLength() { + return _maxLength; + } + + public FieldSpec.MaxLengthExceedStrategy getMaxLengthExceedStrategy() { + return _maxLengthExceedStrategy; + } + + public Object getDefaultNullValue() { + return _defaultNullValue; + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java index 4ab27ffc46..f594c4d00e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java @@ -100,6 +100,9 @@ public class TransformPipeline { if (Boolean.TRUE.equals(transformedRow.getValue(GenericRow.INCOMPLETE_RECORD_KEY))) { reusedResult.incIncompleteRowCount(); } + if (Boolean.TRUE.equals(transformedRow.getValue(GenericRow.SANITIZED_RECORD_KEY))) { + reusedResult.incSanitizedRowCount(); + } } else { reusedResult.incSkippedRowCount(); } @@ -112,6 +115,7 @@ public class TransformPipeline { private final List<GenericRow> _transformedRows = new ArrayList<>(); private int _skippedRowCount = 0; private int _incompleteRowCount = 0; + private int _sanitizedRowCount = 0; public List<GenericRow> getTransformedRows() { return _transformedRows; @@ -137,6 +141,14 @@ public class TransformPipeline { _incompleteRowCount++; } + public void incSanitizedRowCount() { + _sanitizedRowCount++; + } + + public int getSanitizedRowCount() { + return _sanitizedRowCount; + } + public void reset() { _skippedRowCount = 0; _incompleteRowCount = 0; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index 46a4181a2b..696f2b1f48 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -36,6 +36,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; import org.testng.annotations.Test; @@ -260,7 +261,9 @@ public class RecordTransformerTest { } @Test - public void testSanitationTransformer() { + public void testSanitizationTransformer() { + // scenario where string contains null and exceeds max length + // and fieldSpec maxLengthExceedStrategy is default (TRIM_LENGTH) RecordTransformer transformer = new SanitizationTransformer(SCHEMA); GenericRow record = getRecord(); for (int i = 0; i < NUM_ROUNDS; i++) { @@ -272,6 +275,210 @@ public class RecordTransformerTest { assertEquals(record.getValue("mvString2"), new Object[]{"123", "123", "123.0", "123.0", "123"}); assertNull(record.getValue("$virtual")); assertTrue(record.getNullValueFields().isEmpty()); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where string contains null and fieldSpec maxLengthExceedStrategy is to ERROR + Schema schema = SCHEMA; + schema.getFieldSpecFor("svStringWithNullCharacters") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.ERROR); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + try { + record = transformer.transform(record); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals(e.getMessage(), "Throwing exception as value: 1\0002\0003 for column " + + "svStringWithNullCharacters contains null character."); + } + } + + // scenario where string contains null and fieldSpec maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE + schema = SCHEMA; + schema.getFieldSpecFor("svStringWithNullCharacters") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svStringWithNullCharacters"), "null"); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where string exceeds max length and fieldSpec maxLengthExceedStrategy is to ERROR + schema = SCHEMA; + schema.getFieldSpecFor("svStringWithLengthLimit") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.ERROR); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + try { + record = transformer.transform(record); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals(e.getMessage(), "Throwing exception as value: 123 for column svStringWithLengthLimit " + + "exceeds configured max length 2."); + } + } + + // scenario where string exceeds max length and fieldSpec maxLengthExceedStrategy is to SUBSTITUTE_DEFAULT_VALUE + schema = SCHEMA; + schema.getFieldSpecFor("svStringWithLengthLimit") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svStringWithLengthLimit"), "null"); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where string exceeds max length and fieldSpec maxLengthExceedStrategy is to NO_ACTION + schema = SCHEMA; + schema.getFieldSpecFor("svStringWithLengthLimit") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svStringWithLengthLimit"), "123"); + } + + // scenario where string contains null and fieldSpec maxLengthExceedStrategy is to NO_ACTION + schema = SCHEMA; + schema.getFieldSpecFor("svStringWithNullCharacters") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svStringWithNullCharacters"), "1"); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where json field exceeds max length and fieldSpec maxLengthExceedStrategy is to NO_ACTION + schema = SCHEMA; + schema.getFieldSpecFor("svJson").setMaxLength(10); + schema.getFieldSpecFor("svJson") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svJson"), "{\"first\": \"daffy\", \"last\": \"duck\"}"); + } + + // scenario where json field exceeds max length and fieldSpec maxLengthExceedStrategy is to TRIM_LENGTH + schema = SCHEMA; + schema.getFieldSpecFor("svJson").setMaxLength(10); + schema.getFieldSpecFor("svJson") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svJson"), "{\"first\": "); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where json field exceeds max length and fieldSpec maxLengthExceedStrategy is to + // SUBSTITUTE_DEFAULT_VALUE + schema = SCHEMA; + schema.getFieldSpecFor("svJson").setMaxLength(10); + schema.getFieldSpecFor("svJson") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svJson"), "null"); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where json field exceeds max length and fieldSpec maxLengthExceedStrategy is to + // SUBSTITUTE_DEFAULT_VALUE + schema = SCHEMA; + schema.getFieldSpecFor("svJson").setMaxLength(10); + schema.getFieldSpecFor("svJson") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + try { + record = transformer.transform(record); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals(e.getMessage(), "Throwing exception as value: " + + "{\"first\": \"daffy\", \"last\": \"duck\"} for column " + + "svJson exceeds configured max length 10."); + } + } + + // scenario where bytes field exceeds max length and fieldSpec maxLengthExceedStrategy is to NO_ACTION + schema = SCHEMA; + schema.getFieldSpecFor("svBytes").setMaxLength(2); + schema.getFieldSpecFor("svBytes") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.NO_ACTION); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svBytes"), "7b7b"); + } + + // scenario where bytes field exceeds max length and fieldSpec maxLengthExceedStrategy is to TRIM_LENGTH + schema = SCHEMA; + schema.getFieldSpecFor("svBytes").setMaxLength(2); + schema.getFieldSpecFor("svBytes") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.TRIM_LENGTH); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svBytes"), "7b"); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where bytes field exceeds max length and fieldSpec maxLengthExceedStrategy is to + // SUBSTITUTE_DEFAULT_VALUE + schema = SCHEMA; + schema.getFieldSpecFor("svBytes").setMaxLength(2); + schema.getFieldSpecFor("svBytes") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.SUBSTITUTE_DEFAULT_VALUE); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svBytes"), BytesUtils.toHexString(new byte[0])); + assertTrue(record.getFieldToValueMap().containsKey(GenericRow.SANITIZED_RECORD_KEY)); + } + + // scenario where bytes field exceeds max length and fieldSpec maxLengthExceedStrategy is to ERROR + schema = SCHEMA; + schema.getFieldSpecFor("svBytes").setMaxLength(2); + schema.getFieldSpecFor("svBytes") + .setMaxLengthExceedStrategy(FieldSpec.MaxLengthExceedStrategy.ERROR); + transformer = new SanitizationTransformer(schema); + record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + try { + record = transformer.transform(record); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals(e.getMessage(), "Throwing exception as value: 7b7b for column svBytes " + + "exceeds configured max length 2."); + } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java index 11545a0278..0637abd7fa 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DimensionFieldSpec.java @@ -45,6 +45,11 @@ public final class DimensionFieldSpec extends FieldSpec { super(name, dataType, isSingleValueField, maxLength, defaultNullValue); } + public DimensionFieldSpec(String name, DataType dataType, boolean isSingleValueField, int maxLength, + @Nullable Object defaultNullValue, @Nullable MaxLengthExceedStrategy maxLengthExceedStrategy) { + super(name, dataType, isSingleValueField, maxLength, defaultNullValue, maxLengthExceedStrategy); + } + public DimensionFieldSpec(String name, DataType dataType, boolean isSingleValueField, Class virtualColumnProviderClass) { super(name, dataType, isSingleValueField); @@ -66,6 +71,7 @@ public final class DimensionFieldSpec extends FieldSpec { @Override public String toString() { return "< field type: DIMENSION, field name: " + _name + ", data type: " + _dataType + ", is single-value field: " - + _isSingleValueField + ", default null value: " + _defaultNullValue + " >"; + + _isSingleValueField + ", default null value: " + _defaultNullValue + ", max length exceed strategy: " + + _maxLengthExceedStrategy + " >"; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java index d5f3f7e403..5fa586b6b9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java @@ -46,6 +46,8 @@ import org.apache.pinot.spi.utils.TimestampUtils; * <p>- <code>DefaultNullValue</code>: when no value found for this field, use this value. Stored in string format. * <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use for this field. * <p>- <code>NotNull</code>: whether the column accepts nulls or not. Defaults to false. + * <p>- <code>MaxLength</code>: the maximum length of the string column. Defaults to 512. + * <p>- <code>MaxLengthExceedStrategy</code>: the strategy to handle the case when the string column exceeds the max */ @SuppressWarnings("unused") public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { @@ -98,6 +100,10 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { } } + public enum MaxLengthExceedStrategy { + TRIM_LENGTH, ERROR, SUBSTITUTE_DEFAULT_VALUE, NO_ACTION + } + protected String _name; protected DataType _dataType; protected boolean _isSingleValueField = true; @@ -106,6 +112,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { // NOTE: This only applies to STRING column, which is the max number of characters private int _maxLength = DEFAULT_MAX_LENGTH; + // NOTE: This only applies to STRING column during {@link SanitizationTransformer} + protected MaxLengthExceedStrategy _maxLengthExceedStrategy; + protected Object _defaultNullValue; private transient String _stringDefaultNullValue; @@ -129,11 +138,17 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { public FieldSpec(String name, DataType dataType, boolean isSingleValueField, int maxLength, @Nullable Object defaultNullValue) { + this(name, dataType, isSingleValueField, maxLength, defaultNullValue, null); + } + + public FieldSpec(String name, DataType dataType, boolean isSingleValueField, int maxLength, + @Nullable Object defaultNullValue, @Nullable MaxLengthExceedStrategy maxLengthExceedStrategy) { _name = name; _dataType = dataType; _isSingleValueField = isSingleValueField; _maxLength = maxLength; setDefaultNullValue(defaultNullValue); + _maxLengthExceedStrategy = maxLengthExceedStrategy; } public abstract FieldType getFieldType(); @@ -175,6 +190,16 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { _maxLength = maxLength; } + @Nullable + public MaxLengthExceedStrategy getMaxLengthExceedStrategy() { + return _maxLengthExceedStrategy; + } + + // Required by JSON de-serializer. DO NOT REMOVE. + public void setMaxLengthExceedStrategy(@Nullable MaxLengthExceedStrategy maxLengthExceedStrategy) { + _maxLengthExceedStrategy = maxLengthExceedStrategy; + } + public String getVirtualColumnProvider() { return _virtualColumnProvider; } @@ -411,6 +436,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { .isEqual(_isSingleValueField, that._isSingleValueField) && EqualityUtils .isEqual(getStringValue(_defaultNullValue), getStringValue(that._defaultNullValue)) && EqualityUtils .isEqual(_maxLength, that._maxLength) && EqualityUtils.isEqual(_transformFunction, that._transformFunction) + && EqualityUtils.isEqual(_maxLengthExceedStrategy, that._maxLengthExceedStrategy) && EqualityUtils.isEqual(_virtualColumnProvider, that._virtualColumnProvider) && EqualityUtils.isEqual(_notNull, that._notNull); } @@ -422,6 +448,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { result = EqualityUtils.hashCodeOf(result, _isSingleValueField); result = EqualityUtils.hashCodeOf(result, getStringValue(_defaultNullValue)); result = EqualityUtils.hashCodeOf(result, _maxLength); + result = EqualityUtils.hashCodeOf(result, _maxLengthExceedStrategy); result = EqualityUtils.hashCodeOf(result, _transformFunction); result = EqualityUtils.hashCodeOf(result, _virtualColumnProvider); result = EqualityUtils.hashCodeOf(result, _notNull); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java index f50d851eba..32df61efb7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/MetricFieldSpec.java @@ -58,6 +58,6 @@ public final class MetricFieldSpec extends FieldSpec { @Override public String toString() { return "< field type: METRIC, field name: " + _name + ", data type: " + _dataType + ", default null value: " - + _defaultNullValue + " >"; + + _defaultNullValue + ", max length exceed strategy: " + _maxLengthExceedStrategy + " >"; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index 2bc15a0800..e4e46fa4ca 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -75,6 +75,8 @@ public class GenericRow implements Serializable { */ public static final String INCOMPLETE_RECORD_KEY = "$INCOMPLETE_RECORD_KEY$"; + public static final String SANITIZED_RECORD_KEY = "$SANITIZED_RECORD_KEY$"; + private final Map<String, Object> _fieldToValueMap = new HashMap<>(); private final Set<String> _nullValueFields = new HashSet<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org