This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7522d8a206 Support ComplexFieldSpec in Schema and column metadata (#13905) 7522d8a206 is described below commit 7522d8a20663100c5e1cfa7402014fb55164d112 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Thu Aug 29 21:23:51 2024 -0700 Support ComplexFieldSpec in Schema and column metadata (#13905) --- .../apache/pinot/core/util/SchemaUtilsTest.java | 48 +++++++- .../local/function/FunctionEvaluatorFactory.java | 2 +- .../creator/impl/SegmentColumnarIndexCreator.java | 68 ++++++++++- .../pinot/segment/local/utils/SchemaUtils.java | 4 +- .../local/segment/index/ColumnMetadataTest.java | 24 ++++ .../org/apache/pinot/segment/spi/V1Constants.java | 3 + .../spi/creator/SegmentGeneratorConfig.java | 4 + .../spi/index/metadata/ColumnMetadataImpl.java | 126 ++++++++++++--------- .../spi/index/metadata/SegmentMetadataImpl.java | 1 + .../apache/pinot/spi/data/ComplexFieldSpec.java | 79 +++++++++++-- .../java/org/apache/pinot/spi/data/FieldSpec.java | 54 +++++++++ .../java/org/apache/pinot/spi/data/Schema.java | 43 ++++++- 12 files changed, 384 insertions(+), 72 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java index e1dd69f771..8a62f79251 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java @@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -292,10 +293,10 @@ public class SchemaUtilsTest { @Test public void testValidateCaseInsensitive() { - Schema pinotSchema; - pinotSchema = - new Schema.SchemaBuilder().addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "incoming"), - new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "outgoing")) + Schema pinotSchema = new Schema.SchemaBuilder() + .addTime( + new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "incoming"), + new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "outgoing")) .addSingleValueDimension("dim1", DataType.INT) .addSingleValueDimension("Dim1", DataType.INT) .build(); @@ -471,6 +472,45 @@ public class SchemaUtilsTest { checkValidationFails(pinotSchema); } + @Test + public void testComplexFieldSpec() + throws Exception { + Schema pinotSchema; + // valid schema + pinotSchema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("name", DataType.STRING) + .addComplex("intMap", DataType.MAP, Map.of( + "key", new DimensionFieldSpec("key", DataType.STRING, true), + "value", new DimensionFieldSpec("value", DataType.INT, true) + )) + .addComplex("stringMap", DataType.MAP, Map.of( + "key", new DimensionFieldSpec("key", DataType.STRING, true), + "value", new DimensionFieldSpec("value", DataType.STRING, true) + )) + .build(); + SchemaUtils.validate(pinotSchema); + String schemaStr = pinotSchema.toString(); + Schema deserSchema = Schema.fromString(schemaStr); + Assert.assertEquals(pinotSchema.getSchemaName(), deserSchema.getSchemaName()); + Assert.assertEquals(pinotSchema.getDimensionNames(), deserSchema.getDimensionNames()); + Assert.assertEquals(pinotSchema.getMetricNames(), deserSchema.getMetricNames()); + Assert.assertEquals(pinotSchema.getTimeFieldSpec(), deserSchema.getTimeFieldSpec()); + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().size(), deserSchema.getComplexFieldSpecs().size()); + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(0).getChildFieldSpecs().size(), + deserSchema.getComplexFieldSpecs().get(0).getChildFieldSpecs().size()); + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD), + deserSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD)); + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD), + deserSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD)); + + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(1).getChildFieldSpecs().size(), + deserSchema.getComplexFieldSpecs().get(1).getChildFieldSpecs().size()); + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD), + deserSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD)); + Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD), + deserSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD)); + } + private void checkValidationFails(Schema pinotSchema, boolean isIgnoreCase) { try { SchemaUtils.validate(pinotSchema, isIgnoreCase); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java index 45aa07ebbe..49a62b2274 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java @@ -64,7 +64,7 @@ public class FunctionEvaluatorFactory { "Caught exception while constructing expression evaluator for transform expression:" + transformExpression + ", of column:" + columnName); } - } else if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.TIME)) { + } else if (fieldSpec.getFieldType() == FieldSpec.FieldType.TIME) { // Time conversions should be done using DateTimeFieldSpec and transformFunctions // But we need below lines for converting TimeFieldSpec's incoming to outgoing diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index a422301f96..749c4cf704 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -24,6 +24,7 @@ import com.google.common.collect.Maps; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,6 +61,7 @@ import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.SegmentZKPropsConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -285,7 +287,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { String column = spec.getName(); boolean createDictionary = false; if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType() - .containsKey(column)) { + .containsKey(column) || spec instanceof ComplexFieldSpec) { return createDictionary; } @@ -478,6 +480,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { properties.setProperty(DIMENSIONS, _config.getDimensions()); properties.setProperty(METRICS, _config.getMetrics()); properties.setProperty(DATETIME_COLUMNS, _config.getDateTimeColumnNames()); + properties.setProperty(COMPLEX_COLUMNS, _config.getComplexColumnNames()); String timeColumnName = _config.getTimeColumnName(); properties.setProperty(TIME_COLUMN_NAME, timeColumnName); properties.setProperty(SEGMENT_TOTAL_DOCS, String.valueOf(_totalDocs)); @@ -610,14 +613,24 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } // datetime field - if (fieldSpec.getFieldType().equals(FieldType.DATE_TIME)) { + if (fieldSpec.getFieldType() == FieldType.DATE_TIME) { DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec; properties.setProperty(getKeyFor(column, DATETIME_FORMAT), dateTimeFieldSpec.getFormat()); properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity()); } + // complex field + if (fieldSpec.getFieldType() == FieldType.COMPLEX) { + ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec; + properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES), + new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet())); + for (Map.Entry<String, FieldSpec> entry : complexFieldSpec.getChildFieldSpecs().entrySet()) { + addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, entry.getKey()), entry.getValue()); + } + } + // NOTE: Min/max could be null for real-time aggregate metrics. - if (totalDocs > 0) { + if ((fieldSpec.getFieldType() != FieldType.COMPLEX) && (totalDocs > 0)) { Object min = columnIndexCreationInfo.getMin(); Object max = columnIndexCreationInfo.getMax(); if (min != null && max != null) { @@ -636,6 +649,55 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } + /** + * In order to persist complex field metadata, we need to recursively add child field specs + * So, each complex field spec will have a property for its child field names and each child field will have its + * own properties of the detailed field spec. + * E.g. a COMPLEX type `intMap` of Map<String, Integer> has 2 child fields: + * - key in STRING type and value in INT type. + * Then we will have the following properties to define a COMPLEX field: + * column.intMap.childFieldNames = [key, value] + * column.intMap$$key.columnType = DIMENSION + * column.intMap$$key.dataType = STRING + * column.intMap$$key.isSingleValued = true + * column.intMap$$value.columnType = DIMENSION + * column.intMap$$value.dataType = INT + * column.intMap$$value.isSingleValued = true + */ + public static void addFieldSpec(PropertiesConfiguration properties, String column, FieldSpec fieldSpec) { + properties.setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType())); + if (!column.equals(fieldSpec.getName())) { + properties.setProperty(getKeyFor(column, COLUMN_NAME), String.valueOf(fieldSpec.getName())); + } + DataType dataType = fieldSpec.getDataType(); + properties.setProperty(getKeyFor(column, DATA_TYPE), String.valueOf(dataType)); + properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField())); + if (dataType.equals(DataType.STRING) || dataType.equals(DataType.BYTES) || dataType.equals(DataType.JSON)) { + properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), fieldSpec.getMaxLength()); + FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = fieldSpec.getMaxLengthExceedStrategy(); + if (maxLengthExceedStrategy != null) { + properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), maxLengthExceedStrategy); + } + } + + // datetime field + if (fieldSpec.getFieldType() == FieldType.DATE_TIME) { + DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec; + properties.setProperty(getKeyFor(column, DATETIME_FORMAT), dateTimeFieldSpec.getFormat()); + properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity()); + } + + // complex field + if (fieldSpec.getFieldType() == FieldType.COMPLEX) { + ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec; + properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES), + new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet())); + for (Map.Entry<String, FieldSpec> entry : complexFieldSpec.getChildFieldSpecs().entrySet()) { + addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, entry.getKey()), entry.getValue()); + } + } + } + public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, @Nullable Object minValue, @Nullable Object maxValue, DataType storedType) { String validMinValue = minValue != null ? getValidPropertyValue(minValue.toString(), storedType) : null; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java index 6a060470bf..9661923d30 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java @@ -138,10 +138,10 @@ public class SchemaUtils { + column + "'", e); } } - if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.TIME)) { + if (fieldSpec.getFieldType() == FieldSpec.FieldType.TIME) { validateTimeFieldSpec((TimeFieldSpec) fieldSpec); } - if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.DATE_TIME)) { + if (fieldSpec.getFieldType() == FieldSpec.FieldType.DATE_TIME) { validateDateTimeFieldSpec((DateTimeFieldSpec) fieldSpec); } if (fieldSpec.getDataType().equals(FieldSpec.DataType.FLOAT) || fieldSpec.getDataType() diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java index dbee83216c..b9168e49bd 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java @@ -30,9 +30,13 @@ import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory; +import org.apache.pinot.segment.local.segment.index.loader.defaultcolumn.DefaultColumnStatistics; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; @@ -40,6 +44,7 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.partition.BoundedColumnValuePartitionFunction; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.env.CommonsConfigurationUtils; @@ -49,6 +54,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER; + public class ColumnMetadataTest { private static final String AVRO_DATA = "data/test_data-mv.avro"; @@ -220,4 +227,21 @@ public class ColumnMetadataTest { Assert.assertEquals(installationOutput.getMinValue(), "\r\n\r\n utils em::C:\\dir\\utils\r\nPSParentPath : Mi"); } + + @Test + public void testComplexFieldSpec() { + ComplexFieldSpec intMapFieldSpec = new ComplexFieldSpec("intMap", DataType.MAP, true, Map.of( + "key", new DimensionFieldSpec("key", DataType.STRING, true), + "value", new DimensionFieldSpec("value", DataType.INT, true) + )); + ColumnIndexCreationInfo columnIndexCreationInfo = + new ColumnIndexCreationInfo(new DefaultColumnStatistics(null, null, null, false, 1, 1), false, false, false, + Map.of()); + PropertiesConfiguration config = new PropertiesConfiguration(); + config.setProperty(SEGMENT_PADDING_CHARACTER, String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR)); + SegmentColumnarIndexCreator.addColumnMetadataInfo(config, "intMap", columnIndexCreationInfo, 1, intMapFieldSpec, + false, -1); + ColumnMetadataImpl intMapColumnMetadata = ColumnMetadataImpl.fromPropertiesConfiguration("intMap", config); + Assert.assertEquals(intMapColumnMetadata.getFieldSpec(), intMapFieldSpec); + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java index 3ecec032ad..07988b5be7 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java @@ -85,6 +85,7 @@ public class V1Constants { public static final String DATETIME_COLUMNS = "segment.datetime.column.names"; public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs"; public static final String SEGMENT_PADDING_CHARACTER = "segment.padding.character"; + public static final String COMPLEX_COLUMNS = "segment.complex.column.names"; public static final String CUSTOM_SUBSET = "custom"; @@ -100,6 +101,7 @@ public class V1Constants { public static final String DATA_TYPE = "dataType"; public static final String BITS_PER_ELEMENT = "bitsPerElement"; public static final String DICTIONARY_ELEMENT_SIZE = "lengthOfEachEntry"; + public static final String COLUMN_NAME = "columnName"; public static final String COLUMN_TYPE = "columnType"; public static final String IS_SORTED = "isSorted"; public static final String HAS_DICTIONARY = "hasDictionary"; @@ -117,6 +119,7 @@ public class V1Constants { public static final String PARTITION_VALUES = "partitionValues"; public static final String DATETIME_FORMAT = "datetimeFormat"; public static final String DATETIME_GRANULARITY = "datetimeGranularity"; + public static final String COMPLEX_CHILD_FIELD_NAMES = "complexChildFieldNames"; public static final String COLUMN_PROPS_KEY_PREFIX = "column."; public static final String SCHEMA_MAX_LENGTH = "schemaMaxLength"; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index a60946908b..4424be0883 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -669,6 +669,10 @@ public class SegmentGeneratorConfig implements Serializable { return getQualifyingFields(FieldType.DATE_TIME, true); } + public List<String> getComplexColumnNames() { + return getQualifyingFields(FieldType.COMPLEX, true); + } + public void setSegmentPartitionConfig(SegmentPartitionConfig segmentPartitionConfig) { _segmentPartitionConfig = segmentPartitionConfig; } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java index fdd1f5c290..91eed38295 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java @@ -39,11 +39,11 @@ import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; +import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.apache.pinot.spi.data.FieldSpec.FieldType; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.spi.data.TimeGranularitySpec; @@ -221,59 +221,32 @@ public class ColumnMetadataImpl implements ColumnMetadata { .setTotalNumberOfEntries(config.getInt(Column.getKeyFor(column, Column.TOTAL_NUMBER_OF_ENTRIES))) .setAutoGenerated(config.getBoolean(Column.getKeyFor(column, Column.IS_AUTO_GENERATED), false)); - FieldType fieldType = - FieldType.valueOf(config.getString(Column.getKeyFor(column, Column.COLUMN_TYPE)).toUpperCase()); - DataType dataType = DataType.valueOf(config.getString(Column.getKeyFor(column, Column.DATA_TYPE)).toUpperCase()); - DataType storedType = dataType.getStoredType(); - String defaultNullValueString = config.getString(Column.getKeyFor(column, Column.DEFAULT_NULL_VALUE), null); - if (defaultNullValueString != null && storedType == DataType.STRING) { - defaultNullValueString = CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(defaultNullValueString); - } - int maxLength = config.getInt(Column.getKeyFor(column, Column.SCHEMA_MAX_LENGTH), FieldSpec.DEFAULT_MAX_LENGTH); - String maxLengthExceedStrategyString = - config.getString(Column.getKeyFor(column, Column.SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), null); - FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = maxLengthExceedStrategyString != null - ? FieldSpec.MaxLengthExceedStrategy.valueOf(maxLengthExceedStrategyString) : null; - FieldSpec fieldSpec; - switch (fieldType) { - case DIMENSION: - boolean isSingleValue = config.getBoolean(Column.getKeyFor(column, Column.IS_SINGLE_VALUED)); - fieldSpec = new DimensionFieldSpec(column, dataType, isSingleValue, maxLength, - defaultNullValueString, maxLengthExceedStrategy); - break; - case METRIC: - fieldSpec = new MetricFieldSpec(column, dataType, defaultNullValueString, maxLength, maxLengthExceedStrategy); - break; - case TIME: - TimeUnit timeUnit = TimeUnit.valueOf(config.getString(Segment.TIME_UNIT, "DAYS").toUpperCase()); - fieldSpec = new TimeFieldSpec(new TimeGranularitySpec(dataType, timeUnit, column)); - break; - case DATE_TIME: - String format = config.getString(Column.getKeyFor(column, Column.DATETIME_FORMAT)); - String granularity = config.getString(Column.getKeyFor(column, Column.DATETIME_GRANULARITY)); - fieldSpec = new DateTimeFieldSpec(column, dataType, format, granularity, defaultNullValueString, null); - break; - default: - throw new IllegalStateException("Unsupported field type: " + fieldType); - } + FieldSpec fieldSpec = generateFieldSpec(column, config); builder.setFieldSpec(fieldSpec); + DataType storedType = fieldSpec.getDataType().getStoredType(); + + if (fieldSpec instanceof ComplexFieldSpec) { + // Complex field does not have min/max value + builder.setMinValue(null); + builder.setMaxValue(null); + builder.setMinMaxValueInvalid(true); + } else { + // Set min/max value if available + // NOTE: Use getProperty() instead of getString() to avoid variable substitution ('${anotherKey}'), which can + // cause problem for special values such as '$${' where the first '$' is identified as escape character. + // TODO: Use getProperty() for other properties as well to avoid the overhead of variable substitution + String minString = (String) config.getProperty(Column.getKeyFor(column, Column.MIN_VALUE)); + String maxString = (String) config.getProperty(Column.getKeyFor(column, Column.MAX_VALUE)); + // Set min/max value if available + if (minString != null) { + builder.setMinValue(builder.parseValue(storedType, column, minString)); + } - // Set min/max value if available - // NOTE: Use getProperty() instead of getString() to avoid variable substitution ('${anotherKey}'), which can cause - // problem for special values such as '$${' where the first '$' is identified as escape character. - // TODO: Use getProperty() for other properties as well to avoid the overhead of variable substitution - String minString = (String) config.getProperty(Column.getKeyFor(column, Column.MIN_VALUE)); - String maxString = (String) config.getProperty(Column.getKeyFor(column, Column.MAX_VALUE)); - // Set min/max value if available - if (minString != null) { - builder.setMinValue(builder.parseValue(storedType, column, minString)); - } - - if (maxString != null) { - builder.setMaxValue(builder.parseValue(storedType, column, maxString)); + if (maxString != null) { + builder.setMaxValue(builder.parseValue(storedType, column, maxString)); + } + builder.setMinMaxValueInvalid(config.getBoolean(Column.getKeyFor(column, Column.MIN_MAX_VALUE_INVALID), false)); } - builder.setMinMaxValueInvalid(config.getBoolean(Column.getKeyFor(column, Column.MIN_MAX_VALUE_INVALID), false)); - // Only support zero padding String padding = config.getString(Segment.SEGMENT_PADDING_CHARACTER, null); Preconditions.checkState(String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR) @@ -311,6 +284,57 @@ public class ColumnMetadataImpl implements ColumnMetadata { return builder.build(); } + public static FieldSpec generateFieldSpec(String column, PropertiesConfiguration config) { + String fieldName = config.getString(Column.getKeyFor(column, Column.COLUMN_NAME), column); + FieldSpec.FieldType fieldType = + FieldSpec.FieldType.valueOf(config.getString(Column.getKeyFor(column, Column.COLUMN_TYPE)).toUpperCase()); + DataType dataType = DataType.valueOf(config.getString(Column.getKeyFor(column, Column.DATA_TYPE)).toUpperCase()); + DataType storedType = dataType.getStoredType(); + String defaultNullValueString = config.getString(Column.getKeyFor(column, Column.DEFAULT_NULL_VALUE), null); + if (defaultNullValueString != null && storedType == DataType.STRING) { + defaultNullValueString = CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(defaultNullValueString); + } + int maxLength = config.getInt(Column.getKeyFor(column, Column.SCHEMA_MAX_LENGTH), FieldSpec.DEFAULT_MAX_LENGTH); + String maxLengthExceedStrategyString = + config.getString(Column.getKeyFor(column, Column.SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), null); + FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = maxLengthExceedStrategyString != null + ? FieldSpec.MaxLengthExceedStrategy.valueOf(maxLengthExceedStrategyString) : null; + FieldSpec fieldSpec; + switch (fieldType) { + case DIMENSION: + boolean isSingleValue = config.getBoolean(Column.getKeyFor(column, Column.IS_SINGLE_VALUED)); + fieldSpec = new DimensionFieldSpec(fieldName, dataType, isSingleValue, maxLength, + defaultNullValueString, maxLengthExceedStrategy); + break; + case METRIC: + fieldSpec = + new MetricFieldSpec(fieldName, dataType, defaultNullValueString, maxLength, maxLengthExceedStrategy); + break; + case TIME: + TimeUnit timeUnit = TimeUnit.valueOf(config.getString(Segment.TIME_UNIT, "DAYS").toUpperCase()); + fieldSpec = new TimeFieldSpec(new TimeGranularitySpec(dataType, timeUnit, fieldName)); + break; + case DATE_TIME: + String format = config.getString(Column.getKeyFor(column, Column.DATETIME_FORMAT)); + String granularity = config.getString(Column.getKeyFor(column, Column.DATETIME_GRANULARITY)); + fieldSpec = new DateTimeFieldSpec(fieldName, dataType, format, granularity, defaultNullValueString, null); + break; + case COMPLEX: + List<String> childFieldNames = + config.getList(String.class, Column.getKeyFor(column, Column.COMPLEX_CHILD_FIELD_NAMES)); + Map<String, FieldSpec> childFieldSpecs = new HashMap<>(); + for (String childField : childFieldNames) { + childFieldSpecs.put(childField, + generateFieldSpec(ComplexFieldSpec.getFullChildName(column, childField), config)); + } + fieldSpec = new ComplexFieldSpec(fieldName, dataType, true, childFieldSpecs); + break; + default: + throw new IllegalStateException("Unsupported field type: " + fieldType); + } + return fieldSpec; + } + public static Builder builder() { return new Builder(); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java index d0b7c84d34..d960e3307c 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java @@ -219,6 +219,7 @@ public class SegmentMetadataImpl implements SegmentMetadata { addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.METRICS), physicalColumns); addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.TIME_COLUMN_NAME), physicalColumns); addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.DATETIME_COLUMNS), physicalColumns); + addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.COMPLEX_COLUMNS), physicalColumns); // Set the table name (for backward compatibility) String tableName = segmentMetadataPropertiesConfiguration.getString(Segment.TABLE_NAME); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java index 2e935bea9a..fd365ae822 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java @@ -20,10 +20,12 @@ package org.apache.pinot.spi.data; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; -import javax.annotation.Nonnull; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.StringUtil; /** @@ -51,6 +53,8 @@ import javax.annotation.Nonnull; */ @JsonIgnoreProperties(ignoreUnknown = true) public final class ComplexFieldSpec extends FieldSpec { + public static final String KEY_FIELD = "key"; + public static final String VALUE_FIELD = "value"; private final Map<String, FieldSpec> _childFieldSpecs; @@ -60,18 +64,19 @@ public final class ComplexFieldSpec extends FieldSpec { _childFieldSpecs = new HashMap<>(); } - public ComplexFieldSpec(@Nonnull String name, DataType dataType, boolean isSingleValueField) { + public ComplexFieldSpec(String name, DataType dataType, boolean isSingleValueField, + Map<String, FieldSpec> childFieldSpecs) { super(name, dataType, isSingleValueField); Preconditions.checkArgument(dataType == DataType.STRUCT || dataType == DataType.MAP || dataType == DataType.LIST); - _childFieldSpecs = new HashMap<>(); + _childFieldSpecs = childFieldSpecs; } - public FieldSpec getChildFieldSpec(String child) { - return _childFieldSpecs.get(child); + public static String[] getColumnPath(String column) { + return column.split("\\$\\$"); } - public void addChildFieldSpec(String child, FieldSpec fieldSpec) { - _childFieldSpecs.put(child, fieldSpec); + public FieldSpec getChildFieldSpec(String child) { + return _childFieldSpecs.get(child); } public Map<String, FieldSpec> getChildFieldSpecs() { @@ -79,7 +84,6 @@ public final class ComplexFieldSpec extends FieldSpec { } @JsonIgnore - @Nonnull @Override public FieldType getFieldType() { return FieldType.COMPLEX; @@ -87,6 +91,63 @@ public final class ComplexFieldSpec extends FieldSpec { @Override public String toString() { - return "field type: COMPLEX, field name: " + _name + ", root data type: " + _dataType; + return "field type: COMPLEX, field name: " + _name + ", root data type: " + _dataType + ", child field specs: " + + _childFieldSpecs; + } + + public static class MapFieldSpec { + private final String _fieldName; + private final FieldSpec _keyFieldSpec; + private final FieldSpec _valueFieldSpec; + + private MapFieldSpec(ComplexFieldSpec complexFieldSpec) { + Preconditions.checkState(complexFieldSpec.getChildFieldSpecs().containsKey(KEY_FIELD), + "Missing 'key' in the 'childFieldSpec'"); + Preconditions.checkState(complexFieldSpec.getChildFieldSpecs().containsKey(VALUE_FIELD), + "Missing 'value' in the 'childFieldSpec'"); + _keyFieldSpec = complexFieldSpec.getChildFieldSpec(KEY_FIELD); + _valueFieldSpec = complexFieldSpec.getChildFieldSpec(VALUE_FIELD); + _fieldName = complexFieldSpec.getName(); + } + + public String getFieldName() { + return _fieldName; + } + + public FieldSpec getKeyFieldSpec() { + return _keyFieldSpec; + } + + public FieldSpec getValueFieldSpec() { + return _valueFieldSpec; + } + } + + public static MapFieldSpec toMapFieldSpec(ComplexFieldSpec complexFieldSpec) { + return new MapFieldSpec(complexFieldSpec); + } + + public static ComplexFieldSpec fromMapFieldSpec(MapFieldSpec mapFieldSpec) { + return new ComplexFieldSpec(mapFieldSpec.getFieldName(), DataType.MAP, true, + Map.of(KEY_FIELD, mapFieldSpec.getKeyFieldSpec(), VALUE_FIELD, mapFieldSpec.getValueFieldSpec())); + } + + /** + * Returns the full child name for the given columns for complex data type. + * E.g. map$$key, map$$value, list$$element, etc. + * This is used in persisting column metadata for complex data types. + */ + public static String getFullChildName(String... columns) { + return StringUtil.join("$$", columns); + } + + public ObjectNode toJsonObject() { + ObjectNode jsonObject = super.toJsonObject(); + ObjectNode childFieldSpecsNode = JsonUtils.newObjectNode(); + for (Map.Entry<String, FieldSpec> entry : _childFieldSpecs.entrySet()) { + childFieldSpecsNode.put(entry.getKey(), entry.getValue().toJsonObject()); + } + jsonObject.put("childFieldSpecs", childFieldSpecsNode); + return jsonObject; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java index 5fa586b6b9..b08c9b1286 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java @@ -20,11 +20,16 @@ package org.apache.pinot.spi.data; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.OptBoolean; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.Serializable; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.utils.BooleanUtils; @@ -50,6 +55,18 @@ import org.apache.pinot.spi.utils.TimestampUtils; * <p>- <code>MaxLengthExceedStrategy</code>: the strategy to handle the case when the string column exceeds the max */ @SuppressWarnings("unused") +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "fieldType", + requireTypeIdForSubtypes = OptBoolean.FALSE +) +@JsonSubTypes({ + @JsonSubTypes.Type(value = DimensionFieldSpec.class, name = "DIMENSION"), + @JsonSubTypes.Type(value = MetricFieldSpec.class, name = "METRIC"), + @JsonSubTypes.Type(value = TimeFieldSpec.class, name = "TIME"), + @JsonSubTypes.Type(value = DateTimeFieldSpec.class, name = "DATE_TIME"), + @JsonSubTypes.Type(value = ComplexFieldSpec.class, name = "COMPLEX") +}) public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { public static final int DEFAULT_MAX_LENGTH = 512; @@ -73,6 +90,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES = new byte[0]; public static final FieldSpecMetadata FIELD_SPEC_METADATA; + public static final Map DEFAULT_COMPLEX_NULL_VALUE_OF_MAP = Map.of(); + public static final List DEFAULT_COMPLEX_NULL_VALUE_OF_LIST = List.of(); + static { // The metadata on the valid list of {@link DataType} for each {@link FieldType} // and the default null values for each combination @@ -305,6 +325,16 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { default: throw new IllegalStateException("Unsupported dimension/time data type: " + dataType); } + case COMPLEX: + switch (dataType) { + case MAP: + return DEFAULT_COMPLEX_NULL_VALUE_OF_MAP; + case LIST: + return DEFAULT_COMPLEX_NULL_VALUE_OF_LIST; + case STRUCT: + default: + throw new IllegalStateException("Unsupported complex data type: " + dataType); + } default: throw new IllegalStateException("Unsupported field type: " + fieldType); } @@ -363,6 +393,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { ObjectNode jsonObject = JsonUtils.newObjectNode(); jsonObject.put("name", _name); jsonObject.put("dataType", _dataType.name()); + jsonObject.put("fieldType", getFieldType().toString()); if (!_isSingleValueField) { jsonObject.put("singleValueField", false); } @@ -408,6 +439,12 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { case BYTES: jsonNode.put(key, BytesUtils.toHexString((byte[]) _defaultNullValue)); break; + case MAP: + jsonNode.put(key, JsonUtils.objectToJsonNode(_defaultNullValue)); + break; + case LIST: + jsonNode.put(key, JsonUtils.objectToJsonNode(_defaultNullValue)); + break; default: throw new IllegalStateException("Unsupported data type: " + this); } @@ -584,6 +621,10 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { return value; case BYTES: return BytesUtils.toBytes(value); + case MAP: + return JsonUtils.stringToObject(value, Map.class); + case LIST: + return JsonUtils.stringToObject(value, List.class); default: throw new IllegalStateException(); } @@ -620,6 +661,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { return ((String) value1).compareTo((String) value2); case BYTES: return ByteArray.compare((byte[]) value1, (byte[]) value2); + case MAP: + case LIST: + throw new UnsupportedOperationException("Cannot compare complex data types: " + this); default: throw new IllegalStateException(); } @@ -635,6 +679,13 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { if (this == BYTES) { return BytesUtils.toHexString((byte[]) value); } + if (this == MAP || this == LIST) { + try { + return JsonUtils.objectToString(value); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } return value.toString(); } @@ -663,6 +714,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { return value; case BYTES: return BytesUtils.toByteArray(value); + case MAP: + case LIST: + throw new UnsupportedOperationException("Cannot convert complex data types: " + this); default: throw new IllegalStateException(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java index 63add34987..a7439d5d1f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java @@ -81,6 +81,7 @@ public final class Schema implements Serializable { private final List<String> _dimensionNames = new ArrayList<>(); private final List<String> _metricNames = new ArrayList<>(); private final List<String> _dateTimeNames = new ArrayList<>(); + private final List<String> _complexNames = new ArrayList<>(); // Set to true if this schema has a JSON column (used to quickly decide whether to run JsonStatementOptimizer on // queries or not). private boolean _hasJSONColumn; @@ -249,6 +250,23 @@ public final class Schema implements Serializable { } } + public List<ComplexFieldSpec> getComplexFieldSpecs() { + return _complexFieldSpecs; + } + + /** + * Required by JSON deserializer. DO NOT USE. DO NOT REMOVE. + * Adding @Deprecated to prevent usage + */ + @Deprecated + public void setComplexFieldSpecs(List<ComplexFieldSpec> complexFieldSpecs) { + Preconditions.checkState(_complexFieldSpecs.isEmpty()); + + for (ComplexFieldSpec complexFieldSpec : complexFieldSpecs) { + addField(complexFieldSpec); + } + } + public void addField(FieldSpec fieldSpec) { Preconditions.checkNotNull(fieldSpec); String columnName = fieldSpec.getName(); @@ -274,6 +292,7 @@ public final class Schema implements Serializable { _dateTimeFieldSpecs.add((DateTimeFieldSpec) fieldSpec); break; case COMPLEX: + _complexNames.add(columnName); _complexFieldSpecs.add((ComplexFieldSpec) fieldSpec); break; default: @@ -313,6 +332,11 @@ public final class Schema implements Serializable { _dateTimeNames.remove(index); _dateTimeFieldSpecs.remove(index); break; + case COMPLEX: + index = _complexNames.indexOf(columnName); + _complexNames.remove(index); + _complexFieldSpecs.remove(index); + break; default: throw new UnsupportedOperationException("Unsupported field type: " + fieldType); } @@ -396,6 +420,15 @@ public final class Schema implements Serializable { return null; } + @JsonIgnore + public ComplexFieldSpec getComplexSpec(String complexName) { + FieldSpec fieldSpec = _fieldSpecMap.get(complexName); + if (fieldSpec != null && fieldSpec.getFieldType() == FieldType.COMPLEX) { + return (ComplexFieldSpec) fieldSpec; + } + return null; + } + /** * Fetches the DateTimeFieldSpec for the given time column name. * If the columnName is a DATE_TIME column, returns the DateTimeFieldSpec @@ -431,6 +464,11 @@ public final class Schema implements Serializable { return _dateTimeNames; } + @JsonIgnore + public List<String> getComplexNames() { + return _complexNames; + } + /** * Returns a json representation of the schema. */ @@ -694,9 +732,10 @@ public final class Schema implements Serializable { * Add complex field spec * @param name name of complex (nested) field * @param dataType root data type of complex field + * @param childFieldSpecs map of child field specs */ - public SchemaBuilder addComplex(String name, DataType dataType) { - _schema.addField(new ComplexFieldSpec(name, dataType, /* single value field */ true)); + public SchemaBuilder addComplex(String name, DataType dataType, Map<String, FieldSpec> childFieldSpecs) { + _schema.addField(new ComplexFieldSpec(name, dataType, /* single value field */ true, childFieldSpecs)); return this; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org