This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7522d8a206 Support ComplexFieldSpec in Schema and column metadata 
(#13905)
7522d8a206 is described below

commit 7522d8a20663100c5e1cfa7402014fb55164d112
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Thu Aug 29 21:23:51 2024 -0700

    Support ComplexFieldSpec in Schema and column metadata (#13905)
---
 .../apache/pinot/core/util/SchemaUtilsTest.java    |  48 +++++++-
 .../local/function/FunctionEvaluatorFactory.java   |   2 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  68 ++++++++++-
 .../pinot/segment/local/utils/SchemaUtils.java     |   4 +-
 .../local/segment/index/ColumnMetadataTest.java    |  24 ++++
 .../org/apache/pinot/segment/spi/V1Constants.java  |   3 +
 .../spi/creator/SegmentGeneratorConfig.java        |   4 +
 .../spi/index/metadata/ColumnMetadataImpl.java     | 126 ++++++++++++---------
 .../spi/index/metadata/SegmentMetadataImpl.java    |   1 +
 .../apache/pinot/spi/data/ComplexFieldSpec.java    |  79 +++++++++++--
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  54 +++++++++
 .../java/org/apache/pinot/spi/data/Schema.java     |  43 ++++++-
 12 files changed, 384 insertions(+), 72 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index e1dd69f771..8a62f79251 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -292,10 +293,10 @@ public class SchemaUtilsTest {
 
   @Test
   public void testValidateCaseInsensitive() {
-    Schema pinotSchema;
-    pinotSchema =
-      new Schema.SchemaBuilder().addTime(new 
TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "incoming"),
-          new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "outgoing"))
+    Schema pinotSchema = new Schema.SchemaBuilder()
+        .addTime(
+            new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, 
"incoming"),
+            new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "outgoing"))
         .addSingleValueDimension("dim1", DataType.INT)
         .addSingleValueDimension("Dim1", DataType.INT)
         .build();
@@ -471,6 +472,45 @@ public class SchemaUtilsTest {
     checkValidationFails(pinotSchema);
   }
 
+  @Test
+  public void testComplexFieldSpec()
+      throws Exception {
+    Schema pinotSchema;
+    // valid schema
+    pinotSchema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension("name", DataType.STRING)
+        .addComplex("intMap", DataType.MAP, Map.of(
+            "key", new DimensionFieldSpec("key", DataType.STRING, true),
+            "value", new DimensionFieldSpec("value", DataType.INT, true)
+        ))
+        .addComplex("stringMap", DataType.MAP, Map.of(
+            "key", new DimensionFieldSpec("key", DataType.STRING, true),
+            "value", new DimensionFieldSpec("value", DataType.STRING, true)
+        ))
+        .build();
+    SchemaUtils.validate(pinotSchema);
+    String schemaStr = pinotSchema.toString();
+    Schema deserSchema = Schema.fromString(schemaStr);
+    Assert.assertEquals(pinotSchema.getSchemaName(), 
deserSchema.getSchemaName());
+    Assert.assertEquals(pinotSchema.getDimensionNames(), 
deserSchema.getDimensionNames());
+    Assert.assertEquals(pinotSchema.getMetricNames(), 
deserSchema.getMetricNames());
+    Assert.assertEquals(pinotSchema.getTimeFieldSpec(), 
deserSchema.getTimeFieldSpec());
+    Assert.assertEquals(pinotSchema.getComplexFieldSpecs().size(), 
deserSchema.getComplexFieldSpecs().size());
+    
Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(0).getChildFieldSpecs().size(),
+        deserSchema.getComplexFieldSpecs().get(0).getChildFieldSpecs().size());
+    
Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD),
+        
deserSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD));
+    
Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD),
+        
deserSchema.getComplexFieldSpecs().get(0).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD));
+
+    
Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(1).getChildFieldSpecs().size(),
+        deserSchema.getComplexFieldSpecs().get(1).getChildFieldSpecs().size());
+    
Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD),
+        
deserSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.KEY_FIELD));
+    
Assert.assertEquals(pinotSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD),
+        
deserSchema.getComplexFieldSpecs().get(1).getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD));
+  }
+
   private void checkValidationFails(Schema pinotSchema, boolean isIgnoreCase) {
     try {
       SchemaUtils.validate(pinotSchema, isIgnoreCase);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java
index 45aa07ebbe..49a62b2274 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/FunctionEvaluatorFactory.java
@@ -64,7 +64,7 @@ public class FunctionEvaluatorFactory {
             "Caught exception while constructing expression evaluator for 
transform expression:" + transformExpression
                 + ", of column:" + columnName);
       }
-    } else if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.TIME)) {
+    } else if (fieldSpec.getFieldType() == FieldSpec.FieldType.TIME) {
 
       // Time conversions should be done using DateTimeFieldSpec and 
transformFunctions
       // But we need below lines for converting TimeFieldSpec's incoming to 
outgoing
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index a422301f96..749c4cf704 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,6 +61,7 @@ import 
org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -285,7 +287,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     String column = spec.getName();
     boolean createDictionary = false;
     if (config.getRawIndexCreationColumns().contains(column) || 
config.getRawIndexCompressionType()
-        .containsKey(column)) {
+        .containsKey(column) || spec instanceof ComplexFieldSpec) {
       return createDictionary;
     }
 
@@ -478,6 +480,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     properties.setProperty(DIMENSIONS, _config.getDimensions());
     properties.setProperty(METRICS, _config.getMetrics());
     properties.setProperty(DATETIME_COLUMNS, _config.getDateTimeColumnNames());
+    properties.setProperty(COMPLEX_COLUMNS, _config.getComplexColumnNames());
     String timeColumnName = _config.getTimeColumnName();
     properties.setProperty(TIME_COLUMN_NAME, timeColumnName);
     properties.setProperty(SEGMENT_TOTAL_DOCS, String.valueOf(_totalDocs));
@@ -610,14 +613,24 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     }
 
     // datetime field
-    if (fieldSpec.getFieldType().equals(FieldType.DATE_TIME)) {
+    if (fieldSpec.getFieldType() == FieldType.DATE_TIME) {
       DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
       properties.setProperty(getKeyFor(column, DATETIME_FORMAT), 
dateTimeFieldSpec.getFormat());
       properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), 
dateTimeFieldSpec.getGranularity());
     }
 
+    // complex field
+    if (fieldSpec.getFieldType() == FieldType.COMPLEX) {
+      ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES),
+          new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet()));
+      for (Map.Entry<String, FieldSpec> entry : 
complexFieldSpec.getChildFieldSpecs().entrySet()) {
+        addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, 
entry.getKey()), entry.getValue());
+      }
+    }
+
     // NOTE: Min/max could be null for real-time aggregate metrics.
-    if (totalDocs > 0) {
+    if ((fieldSpec.getFieldType() != FieldType.COMPLEX) && (totalDocs > 0)) {
       Object min = columnIndexCreationInfo.getMin();
       Object max = columnIndexCreationInfo.getMax();
       if (min != null && max != null) {
@@ -636,6 +649,55 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     }
   }
 
+  /**
+   * In order to persist complex field metadata, we need to recursively add 
child field specs
+   * So, each complex field spec will have a property for its child field 
names and each child field will have its
+   * own properties of the detailed field spec.
+   * E.g. a COMPLEX type `intMap` of Map<String, Integer> has 2 child fields:
+   *   - key in STRING type and value in INT type.
+   *   Then we will have the following properties to define a COMPLEX field:
+   *     column.intMap.childFieldNames = [key, value]
+   *     column.intMap$$key.columnType = DIMENSION
+   *     column.intMap$$key.dataType = STRING
+   *     column.intMap$$key.isSingleValued = true
+   *     column.intMap$$value.columnType = DIMENSION
+   *     column.intMap$$value.dataType = INT
+   *     column.intMap$$value.isSingleValued = true
+   */
+  public static void addFieldSpec(PropertiesConfiguration properties, String 
column, FieldSpec fieldSpec) {
+    properties.setProperty(getKeyFor(column, COLUMN_TYPE), 
String.valueOf(fieldSpec.getFieldType()));
+    if (!column.equals(fieldSpec.getName())) {
+      properties.setProperty(getKeyFor(column, COLUMN_NAME), 
String.valueOf(fieldSpec.getName()));
+    }
+    DataType dataType = fieldSpec.getDataType();
+    properties.setProperty(getKeyFor(column, DATA_TYPE), 
String.valueOf(dataType));
+    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), 
String.valueOf(fieldSpec.isSingleValueField()));
+    if (dataType.equals(DataType.STRING) || dataType.equals(DataType.BYTES) || 
dataType.equals(DataType.JSON)) {
+      properties.setProperty(getKeyFor(column, SCHEMA_MAX_LENGTH), 
fieldSpec.getMaxLength());
+      FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
fieldSpec.getMaxLengthExceedStrategy();
+      if (maxLengthExceedStrategy != null) {
+        properties.setProperty(getKeyFor(column, 
SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), maxLengthExceedStrategy);
+      }
+    }
+
+    // datetime field
+    if (fieldSpec.getFieldType() == FieldType.DATE_TIME) {
+      DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, DATETIME_FORMAT), 
dateTimeFieldSpec.getFormat());
+      properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), 
dateTimeFieldSpec.getGranularity());
+    }
+
+    // complex field
+    if (fieldSpec.getFieldType() == FieldType.COMPLEX) {
+      ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) fieldSpec;
+      properties.setProperty(getKeyFor(column, COMPLEX_CHILD_FIELD_NAMES),
+          new ArrayList<>(complexFieldSpec.getChildFieldSpecs().keySet()));
+      for (Map.Entry<String, FieldSpec> entry : 
complexFieldSpec.getChildFieldSpecs().entrySet()) {
+        addFieldSpec(properties, ComplexFieldSpec.getFullChildName(column, 
entry.getKey()), entry.getValue());
+      }
+    }
+  }
+
   public static void addColumnMinMaxValueInfo(PropertiesConfiguration 
properties, String column,
       @Nullable Object minValue, @Nullable Object maxValue, DataType 
storedType) {
     String validMinValue = minValue != null ? 
getValidPropertyValue(minValue.toString(), storedType) : null;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java
index 6a060470bf..9661923d30 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SchemaUtils.java
@@ -138,10 +138,10 @@ public class SchemaUtils {
                     + column + "'", e);
           }
         }
-        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.TIME)) {
+        if (fieldSpec.getFieldType() == FieldSpec.FieldType.TIME) {
           validateTimeFieldSpec((TimeFieldSpec) fieldSpec);
         }
-        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.DATE_TIME)) {
+        if (fieldSpec.getFieldType() == FieldSpec.FieldType.DATE_TIME) {
           validateDateTimeFieldSpec((DateTimeFieldSpec) fieldSpec);
         }
         if (fieldSpec.getDataType().equals(FieldSpec.DataType.FLOAT) || 
fieldSpec.getDataType()
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
index dbee83216c..b9168e49bd 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
@@ -30,9 +30,13 @@ import 
org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
+import 
org.apache.pinot.segment.local.segment.index.loader.defaultcolumn.DefaultColumnStatistics;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
@@ -40,6 +44,7 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import 
org.apache.pinot.segment.spi.partition.BoundedColumnValuePartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
@@ -49,6 +54,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.SEGMENT_PADDING_CHARACTER;
+
 
 public class ColumnMetadataTest {
   private static final String AVRO_DATA = "data/test_data-mv.avro";
@@ -220,4 +227,21 @@ public class ColumnMetadataTest {
     Assert.assertEquals(installationOutput.getMinValue(),
         "\r\n\r\n  utils   em::C:\\dir\\utils\r\nPSParentPath            : 
Mi");
   }
+
+  @Test
+  public void testComplexFieldSpec() {
+    ComplexFieldSpec intMapFieldSpec = new ComplexFieldSpec("intMap", 
DataType.MAP, true, Map.of(
+        "key", new DimensionFieldSpec("key", DataType.STRING, true),
+        "value", new DimensionFieldSpec("value", DataType.INT, true)
+    ));
+    ColumnIndexCreationInfo columnIndexCreationInfo =
+        new ColumnIndexCreationInfo(new DefaultColumnStatistics(null, null, 
null, false, 1, 1), false, false, false,
+            Map.of());
+    PropertiesConfiguration config = new PropertiesConfiguration();
+    config.setProperty(SEGMENT_PADDING_CHARACTER, 
String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
+    SegmentColumnarIndexCreator.addColumnMetadataInfo(config, "intMap", 
columnIndexCreationInfo, 1, intMapFieldSpec,
+        false, -1);
+    ColumnMetadataImpl intMapColumnMetadata = 
ColumnMetadataImpl.fromPropertiesConfiguration("intMap", config);
+    Assert.assertEquals(intMapColumnMetadata.getFieldSpec(), intMapFieldSpec);
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 3ecec032ad..07988b5be7 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -85,6 +85,7 @@ public class V1Constants {
       public static final String DATETIME_COLUMNS = 
"segment.datetime.column.names";
       public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs";
       public static final String SEGMENT_PADDING_CHARACTER = 
"segment.padding.character";
+      public static final String COMPLEX_COLUMNS = 
"segment.complex.column.names";
 
       public static final String CUSTOM_SUBSET = "custom";
 
@@ -100,6 +101,7 @@ public class V1Constants {
       public static final String DATA_TYPE = "dataType";
       public static final String BITS_PER_ELEMENT = "bitsPerElement";
       public static final String DICTIONARY_ELEMENT_SIZE = "lengthOfEachEntry";
+      public static final String COLUMN_NAME = "columnName";
       public static final String COLUMN_TYPE = "columnType";
       public static final String IS_SORTED = "isSorted";
       public static final String HAS_DICTIONARY = "hasDictionary";
@@ -117,6 +119,7 @@ public class V1Constants {
       public static final String PARTITION_VALUES = "partitionValues";
       public static final String DATETIME_FORMAT = "datetimeFormat";
       public static final String DATETIME_GRANULARITY = "datetimeGranularity";
+      public static final String COMPLEX_CHILD_FIELD_NAMES = 
"complexChildFieldNames";
 
       public static final String COLUMN_PROPS_KEY_PREFIX = "column.";
       public static final String SCHEMA_MAX_LENGTH = "schemaMaxLength";
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index a60946908b..4424be0883 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -669,6 +669,10 @@ public class SegmentGeneratorConfig implements 
Serializable {
     return getQualifyingFields(FieldType.DATE_TIME, true);
   }
 
+  public List<String> getComplexColumnNames() {
+    return getQualifyingFields(FieldType.COMPLEX, true);
+  }
+
   public void setSegmentPartitionConfig(SegmentPartitionConfig 
segmentPartitionConfig) {
     _segmentPartitionConfig = segmentPartitionConfig;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
index fdd1f5c290..91eed38295 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -39,11 +39,11 @@ import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.FieldSpec.FieldType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
@@ -221,59 +221,32 @@ public class ColumnMetadataImpl implements ColumnMetadata 
{
         .setTotalNumberOfEntries(config.getInt(Column.getKeyFor(column, 
Column.TOTAL_NUMBER_OF_ENTRIES)))
         .setAutoGenerated(config.getBoolean(Column.getKeyFor(column, 
Column.IS_AUTO_GENERATED), false));
 
-    FieldType fieldType =
-        FieldType.valueOf(config.getString(Column.getKeyFor(column, 
Column.COLUMN_TYPE)).toUpperCase());
-    DataType dataType = 
DataType.valueOf(config.getString(Column.getKeyFor(column, 
Column.DATA_TYPE)).toUpperCase());
-    DataType storedType = dataType.getStoredType();
-    String defaultNullValueString = config.getString(Column.getKeyFor(column, 
Column.DEFAULT_NULL_VALUE), null);
-    if (defaultNullValueString != null && storedType == DataType.STRING) {
-      defaultNullValueString = 
CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(defaultNullValueString);
-    }
-    int maxLength = config.getInt(Column.getKeyFor(column, 
Column.SCHEMA_MAX_LENGTH), FieldSpec.DEFAULT_MAX_LENGTH);
-    String maxLengthExceedStrategyString =
-        config.getString(Column.getKeyFor(column, 
Column.SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), null);
-    FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
maxLengthExceedStrategyString != null
-        ? 
FieldSpec.MaxLengthExceedStrategy.valueOf(maxLengthExceedStrategyString) : null;
-    FieldSpec fieldSpec;
-    switch (fieldType) {
-      case DIMENSION:
-        boolean isSingleValue = config.getBoolean(Column.getKeyFor(column, 
Column.IS_SINGLE_VALUED));
-        fieldSpec = new DimensionFieldSpec(column, dataType, isSingleValue, 
maxLength,
-            defaultNullValueString, maxLengthExceedStrategy);
-        break;
-      case METRIC:
-        fieldSpec = new MetricFieldSpec(column, dataType, 
defaultNullValueString, maxLength, maxLengthExceedStrategy);
-        break;
-      case TIME:
-        TimeUnit timeUnit = 
TimeUnit.valueOf(config.getString(Segment.TIME_UNIT, "DAYS").toUpperCase());
-        fieldSpec = new TimeFieldSpec(new TimeGranularitySpec(dataType, 
timeUnit, column));
-        break;
-      case DATE_TIME:
-        String format = config.getString(Column.getKeyFor(column, 
Column.DATETIME_FORMAT));
-        String granularity = config.getString(Column.getKeyFor(column, 
Column.DATETIME_GRANULARITY));
-        fieldSpec = new DateTimeFieldSpec(column, dataType, format, 
granularity, defaultNullValueString, null);
-        break;
-      default:
-        throw new IllegalStateException("Unsupported field type: " + 
fieldType);
-    }
+    FieldSpec fieldSpec = generateFieldSpec(column, config);
     builder.setFieldSpec(fieldSpec);
+    DataType storedType = fieldSpec.getDataType().getStoredType();
+
+    if (fieldSpec instanceof ComplexFieldSpec) {
+      // Complex field does not have min/max value
+      builder.setMinValue(null);
+      builder.setMaxValue(null);
+      builder.setMinMaxValueInvalid(true);
+    } else {
+      // Set min/max value if available
+      // NOTE: Use getProperty() instead of getString() to avoid variable 
substitution ('${anotherKey}'), which can
+      //       cause problem for special values such as '$${' where the first 
'$' is identified as escape character.
+      // TODO: Use getProperty() for other properties as well to avoid the 
overhead of variable substitution
+      String minString = (String) config.getProperty(Column.getKeyFor(column, 
Column.MIN_VALUE));
+      String maxString = (String) config.getProperty(Column.getKeyFor(column, 
Column.MAX_VALUE));
+      // Set min/max value if available
+      if (minString != null) {
+        builder.setMinValue(builder.parseValue(storedType, column, minString));
+      }
 
-    // Set min/max value if available
-    // NOTE: Use getProperty() instead of getString() to avoid variable 
substitution ('${anotherKey}'), which can cause
-    //       problem for special values such as '$${' where the first '$' is 
identified as escape character.
-    // TODO: Use getProperty() for other properties as well to avoid the 
overhead of variable substitution
-    String minString = (String) config.getProperty(Column.getKeyFor(column, 
Column.MIN_VALUE));
-    String maxString = (String) config.getProperty(Column.getKeyFor(column, 
Column.MAX_VALUE));
-    // Set min/max value if available
-    if (minString != null) {
-      builder.setMinValue(builder.parseValue(storedType, column, minString));
-    }
-
-    if (maxString != null) {
-      builder.setMaxValue(builder.parseValue(storedType, column, maxString));
+      if (maxString != null) {
+        builder.setMaxValue(builder.parseValue(storedType, column, maxString));
+      }
+      builder.setMinMaxValueInvalid(config.getBoolean(Column.getKeyFor(column, 
Column.MIN_MAX_VALUE_INVALID), false));
     }
-    builder.setMinMaxValueInvalid(config.getBoolean(Column.getKeyFor(column, 
Column.MIN_MAX_VALUE_INVALID), false));
-
     // Only support zero padding
     String padding = config.getString(Segment.SEGMENT_PADDING_CHARACTER, null);
     
Preconditions.checkState(String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR)
@@ -311,6 +284,57 @@ public class ColumnMetadataImpl implements ColumnMetadata {
     return builder.build();
   }
 
+  public static FieldSpec generateFieldSpec(String column, 
PropertiesConfiguration config) {
+    String fieldName = config.getString(Column.getKeyFor(column, 
Column.COLUMN_NAME), column);
+    FieldSpec.FieldType fieldType =
+        FieldSpec.FieldType.valueOf(config.getString(Column.getKeyFor(column, 
Column.COLUMN_TYPE)).toUpperCase());
+    DataType dataType = 
DataType.valueOf(config.getString(Column.getKeyFor(column, 
Column.DATA_TYPE)).toUpperCase());
+    DataType storedType = dataType.getStoredType();
+    String defaultNullValueString = config.getString(Column.getKeyFor(column, 
Column.DEFAULT_NULL_VALUE), null);
+    if (defaultNullValueString != null && storedType == DataType.STRING) {
+      defaultNullValueString = 
CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(defaultNullValueString);
+    }
+    int maxLength = config.getInt(Column.getKeyFor(column, 
Column.SCHEMA_MAX_LENGTH), FieldSpec.DEFAULT_MAX_LENGTH);
+    String maxLengthExceedStrategyString =
+        config.getString(Column.getKeyFor(column, 
Column.SCHEMA_MAX_LENGTH_EXCEED_STRATEGY), null);
+    FieldSpec.MaxLengthExceedStrategy maxLengthExceedStrategy = 
maxLengthExceedStrategyString != null
+        ? 
FieldSpec.MaxLengthExceedStrategy.valueOf(maxLengthExceedStrategyString) : null;
+    FieldSpec fieldSpec;
+    switch (fieldType) {
+      case DIMENSION:
+        boolean isSingleValue = config.getBoolean(Column.getKeyFor(column, 
Column.IS_SINGLE_VALUED));
+        fieldSpec = new DimensionFieldSpec(fieldName, dataType, isSingleValue, 
maxLength,
+            defaultNullValueString, maxLengthExceedStrategy);
+        break;
+      case METRIC:
+        fieldSpec =
+            new MetricFieldSpec(fieldName, dataType, defaultNullValueString, 
maxLength, maxLengthExceedStrategy);
+        break;
+      case TIME:
+        TimeUnit timeUnit = 
TimeUnit.valueOf(config.getString(Segment.TIME_UNIT, "DAYS").toUpperCase());
+        fieldSpec = new TimeFieldSpec(new TimeGranularitySpec(dataType, 
timeUnit, fieldName));
+        break;
+      case DATE_TIME:
+        String format = config.getString(Column.getKeyFor(column, 
Column.DATETIME_FORMAT));
+        String granularity = config.getString(Column.getKeyFor(column, 
Column.DATETIME_GRANULARITY));
+        fieldSpec = new DateTimeFieldSpec(fieldName, dataType, format, 
granularity, defaultNullValueString, null);
+        break;
+      case COMPLEX:
+        List<String> childFieldNames =
+            config.getList(String.class, Column.getKeyFor(column, 
Column.COMPLEX_CHILD_FIELD_NAMES));
+        Map<String, FieldSpec> childFieldSpecs = new HashMap<>();
+        for (String childField : childFieldNames) {
+          childFieldSpecs.put(childField,
+              generateFieldSpec(ComplexFieldSpec.getFullChildName(column, 
childField), config));
+        }
+        fieldSpec = new ComplexFieldSpec(fieldName, dataType, true, 
childFieldSpecs);
+        break;
+      default:
+        throw new IllegalStateException("Unsupported field type: " + 
fieldType);
+    }
+    return fieldSpec;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index d0b7c84d34..d960e3307c 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -219,6 +219,7 @@ public class SegmentMetadataImpl implements SegmentMetadata 
{
     
addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.METRICS),
 physicalColumns);
     
addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.TIME_COLUMN_NAME),
 physicalColumns);
     
addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.DATETIME_COLUMNS),
 physicalColumns);
+    
addPhysicalColumns(segmentMetadataPropertiesConfiguration.getList(Segment.COMPLEX_COLUMNS),
 physicalColumns);
 
     // Set the table name (for backward compatibility)
     String tableName = 
segmentMetadataPropertiesConfiguration.getString(Segment.TABLE_NAME);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java
index 2e935bea9a..fd365ae822 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/ComplexFieldSpec.java
@@ -20,10 +20,12 @@ package org.apache.pinot.spi.data;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
-import javax.annotation.Nonnull;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.StringUtil;
 
 
 /**
@@ -51,6 +53,8 @@ import javax.annotation.Nonnull;
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public final class ComplexFieldSpec extends FieldSpec {
+  public static final String KEY_FIELD = "key";
+  public static final String VALUE_FIELD = "value";
 
   private final Map<String, FieldSpec> _childFieldSpecs;
 
@@ -60,18 +64,19 @@ public final class ComplexFieldSpec extends FieldSpec {
     _childFieldSpecs = new HashMap<>();
   }
 
-  public ComplexFieldSpec(@Nonnull String name, DataType dataType, boolean 
isSingleValueField) {
+  public ComplexFieldSpec(String name, DataType dataType, boolean 
isSingleValueField,
+      Map<String, FieldSpec> childFieldSpecs) {
     super(name, dataType, isSingleValueField);
     Preconditions.checkArgument(dataType == DataType.STRUCT || dataType == 
DataType.MAP || dataType == DataType.LIST);
-    _childFieldSpecs = new HashMap<>();
+    _childFieldSpecs = childFieldSpecs;
   }
 
-  public FieldSpec getChildFieldSpec(String child) {
-    return _childFieldSpecs.get(child);
+  public static String[] getColumnPath(String column) {
+    return column.split("\\$\\$");
   }
 
-  public void addChildFieldSpec(String child, FieldSpec fieldSpec) {
-    _childFieldSpecs.put(child, fieldSpec);
+  public FieldSpec getChildFieldSpec(String child) {
+    return _childFieldSpecs.get(child);
   }
 
   public Map<String, FieldSpec> getChildFieldSpecs() {
@@ -79,7 +84,6 @@ public final class ComplexFieldSpec extends FieldSpec {
   }
 
   @JsonIgnore
-  @Nonnull
   @Override
   public FieldType getFieldType() {
     return FieldType.COMPLEX;
@@ -87,6 +91,63 @@ public final class ComplexFieldSpec extends FieldSpec {
 
   @Override
   public String toString() {
-    return "field type: COMPLEX, field name: " + _name + ", root data type: " 
+ _dataType;
+    return "field type: COMPLEX, field name: " + _name + ", root data type: " 
+ _dataType + ", child field specs: "
+        + _childFieldSpecs;
+  }
+
+  public static class MapFieldSpec {
+    private final String _fieldName;
+    private final FieldSpec _keyFieldSpec;
+    private final FieldSpec _valueFieldSpec;
+
+    private MapFieldSpec(ComplexFieldSpec complexFieldSpec) {
+      
Preconditions.checkState(complexFieldSpec.getChildFieldSpecs().containsKey(KEY_FIELD),
+          "Missing 'key' in the 'childFieldSpec'");
+      
Preconditions.checkState(complexFieldSpec.getChildFieldSpecs().containsKey(VALUE_FIELD),
+          "Missing 'value' in the 'childFieldSpec'");
+      _keyFieldSpec = complexFieldSpec.getChildFieldSpec(KEY_FIELD);
+      _valueFieldSpec = complexFieldSpec.getChildFieldSpec(VALUE_FIELD);
+      _fieldName = complexFieldSpec.getName();
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+
+    public FieldSpec getKeyFieldSpec() {
+      return _keyFieldSpec;
+    }
+
+    public FieldSpec getValueFieldSpec() {
+      return _valueFieldSpec;
+    }
+  }
+
+  public static MapFieldSpec toMapFieldSpec(ComplexFieldSpec complexFieldSpec) 
{
+    return new MapFieldSpec(complexFieldSpec);
+  }
+
+  public static ComplexFieldSpec fromMapFieldSpec(MapFieldSpec mapFieldSpec) {
+    return new ComplexFieldSpec(mapFieldSpec.getFieldName(), DataType.MAP, 
true,
+        Map.of(KEY_FIELD, mapFieldSpec.getKeyFieldSpec(), VALUE_FIELD, 
mapFieldSpec.getValueFieldSpec()));
+  }
+
+  /**
+   * Returns the full child name for the given columns for complex data type.
+   * E.g. map$$key, map$$value, list$$element, etc.
+   * This is used in persisting column metadata for complex data types.
+   */
+  public static String getFullChildName(String... columns) {
+    return StringUtil.join("$$", columns);
+  }
+
+  public ObjectNode toJsonObject() {
+    ObjectNode jsonObject = super.toJsonObject();
+    ObjectNode childFieldSpecsNode = JsonUtils.newObjectNode();
+    for (Map.Entry<String, FieldSpec> entry : _childFieldSpecs.entrySet()) {
+      childFieldSpecsNode.put(entry.getKey(), entry.getValue().toJsonObject());
+    }
+    jsonObject.put("childFieldSpecs", childFieldSpecsNode);
+    return jsonObject;
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index 5fa586b6b9..b08c9b1286 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -20,11 +20,16 @@ package org.apache.pinot.spi.data;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.OptBoolean;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.BooleanUtils;
@@ -50,6 +55,18 @@ import org.apache.pinot.spi.utils.TimestampUtils;
  * <p>- <code>MaxLengthExceedStrategy</code>: the strategy to handle the case 
when the string column exceeds the max
  */
 @SuppressWarnings("unused")
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    property = "fieldType",
+    requireTypeIdForSubtypes = OptBoolean.FALSE
+)
+@JsonSubTypes({
+    @JsonSubTypes.Type(value = DimensionFieldSpec.class, name = "DIMENSION"),
+    @JsonSubTypes.Type(value = MetricFieldSpec.class, name = "METRIC"),
+    @JsonSubTypes.Type(value = TimeFieldSpec.class, name = "TIME"),
+    @JsonSubTypes.Type(value = DateTimeFieldSpec.class, name = "DATE_TIME"),
+    @JsonSubTypes.Type(value = ComplexFieldSpec.class, name = "COMPLEX")
+})
 public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable 
{
   public static final int DEFAULT_MAX_LENGTH = 512;
 
@@ -73,6 +90,9 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
   public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES = new byte[0];
   public static final FieldSpecMetadata FIELD_SPEC_METADATA;
 
+  public static final Map DEFAULT_COMPLEX_NULL_VALUE_OF_MAP = Map.of();
+  public static final List DEFAULT_COMPLEX_NULL_VALUE_OF_LIST = List.of();
+
   static {
     // The metadata on the valid list of {@link DataType} for each {@link 
FieldType}
     // and the default null values for each combination
@@ -305,6 +325,16 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
             default:
               throw new IllegalStateException("Unsupported dimension/time data 
type: " + dataType);
           }
+        case COMPLEX:
+          switch (dataType) {
+            case MAP:
+              return DEFAULT_COMPLEX_NULL_VALUE_OF_MAP;
+            case LIST:
+              return DEFAULT_COMPLEX_NULL_VALUE_OF_LIST;
+            case STRUCT:
+            default:
+              throw new IllegalStateException("Unsupported complex data type: 
" + dataType);
+          }
         default:
           throw new IllegalStateException("Unsupported field type: " + 
fieldType);
       }
@@ -363,6 +393,7 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     ObjectNode jsonObject = JsonUtils.newObjectNode();
     jsonObject.put("name", _name);
     jsonObject.put("dataType", _dataType.name());
+    jsonObject.put("fieldType", getFieldType().toString());
     if (!_isSingleValueField) {
       jsonObject.put("singleValueField", false);
     }
@@ -408,6 +439,12 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
         case BYTES:
           jsonNode.put(key, BytesUtils.toHexString((byte[]) 
_defaultNullValue));
           break;
+        case MAP:
+          jsonNode.put(key, JsonUtils.objectToJsonNode(_defaultNullValue));
+          break;
+        case LIST:
+          jsonNode.put(key, JsonUtils.objectToJsonNode(_defaultNullValue));
+          break;
         default:
           throw new IllegalStateException("Unsupported data type: " + this);
       }
@@ -584,6 +621,10 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
             return value;
           case BYTES:
             return BytesUtils.toBytes(value);
+          case MAP:
+            return JsonUtils.stringToObject(value, Map.class);
+          case LIST:
+            return JsonUtils.stringToObject(value, List.class);
           default:
             throw new IllegalStateException();
         }
@@ -620,6 +661,9 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
           return ((String) value1).compareTo((String) value2);
         case BYTES:
           return ByteArray.compare((byte[]) value1, (byte[]) value2);
+        case MAP:
+        case LIST:
+          throw new UnsupportedOperationException("Cannot compare complex data 
types: " + this);
         default:
           throw new IllegalStateException();
       }
@@ -635,6 +679,13 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
       if (this == BYTES) {
         return BytesUtils.toHexString((byte[]) value);
       }
+      if (this == MAP || this == LIST) {
+        try {
+          return JsonUtils.objectToString(value);
+        } catch (JsonProcessingException e) {
+          throw new RuntimeException(e);
+        }
+      }
       return value.toString();
     }
 
@@ -663,6 +714,9 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
             return value;
           case BYTES:
             return BytesUtils.toByteArray(value);
+          case MAP:
+          case LIST:
+            throw new UnsupportedOperationException("Cannot convert complex 
data types: " + this);
           default:
             throw new IllegalStateException();
         }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 63add34987..a7439d5d1f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -81,6 +81,7 @@ public final class Schema implements Serializable {
   private final List<String> _dimensionNames = new ArrayList<>();
   private final List<String> _metricNames = new ArrayList<>();
   private final List<String> _dateTimeNames = new ArrayList<>();
+  private final List<String> _complexNames = new ArrayList<>();
   // Set to true if this schema has a JSON column (used to quickly decide 
whether to run JsonStatementOptimizer on
   // queries or not).
   private boolean _hasJSONColumn;
@@ -249,6 +250,23 @@ public final class Schema implements Serializable {
     }
   }
 
+  public List<ComplexFieldSpec> getComplexFieldSpecs() {
+    return _complexFieldSpecs;
+  }
+
+  /**
+   * Required by JSON deserializer. DO NOT USE. DO NOT REMOVE.
+   * Adding @Deprecated to prevent usage
+   */
+  @Deprecated
+  public void setComplexFieldSpecs(List<ComplexFieldSpec> complexFieldSpecs) {
+    Preconditions.checkState(_complexFieldSpecs.isEmpty());
+
+    for (ComplexFieldSpec complexFieldSpec : complexFieldSpecs) {
+      addField(complexFieldSpec);
+    }
+  }
+
   public void addField(FieldSpec fieldSpec) {
     Preconditions.checkNotNull(fieldSpec);
     String columnName = fieldSpec.getName();
@@ -274,6 +292,7 @@ public final class Schema implements Serializable {
         _dateTimeFieldSpecs.add((DateTimeFieldSpec) fieldSpec);
         break;
       case COMPLEX:
+        _complexNames.add(columnName);
         _complexFieldSpecs.add((ComplexFieldSpec) fieldSpec);
         break;
       default:
@@ -313,6 +332,11 @@ public final class Schema implements Serializable {
           _dateTimeNames.remove(index);
           _dateTimeFieldSpecs.remove(index);
           break;
+        case COMPLEX:
+          index = _complexNames.indexOf(columnName);
+          _complexNames.remove(index);
+          _complexFieldSpecs.remove(index);
+          break;
         default:
           throw new UnsupportedOperationException("Unsupported field type: " + 
fieldType);
       }
@@ -396,6 +420,15 @@ public final class Schema implements Serializable {
     return null;
   }
 
+  @JsonIgnore
+  public ComplexFieldSpec getComplexSpec(String complexName) {
+    FieldSpec fieldSpec = _fieldSpecMap.get(complexName);
+    if (fieldSpec != null && fieldSpec.getFieldType() == FieldType.COMPLEX) {
+      return (ComplexFieldSpec) fieldSpec;
+    }
+    return null;
+  }
+
   /**
    * Fetches the DateTimeFieldSpec for the given time column name.
    * If the columnName is a DATE_TIME column, returns the DateTimeFieldSpec
@@ -431,6 +464,11 @@ public final class Schema implements Serializable {
     return _dateTimeNames;
   }
 
+  @JsonIgnore
+  public List<String> getComplexNames() {
+    return _complexNames;
+  }
+
   /**
    * Returns a json representation of the schema.
    */
@@ -694,9 +732,10 @@ public final class Schema implements Serializable {
      * Add complex field spec
      * @param name name of complex (nested) field
      * @param dataType root data type of complex field
+     * @param childFieldSpecs map of child field specs
      */
-    public SchemaBuilder addComplex(String name, DataType dataType) {
-      _schema.addField(new ComplexFieldSpec(name, dataType, /* single value 
field */ true));
+    public SchemaBuilder addComplex(String name, DataType dataType, 
Map<String, FieldSpec> childFieldSpecs) {
+      _schema.addField(new ComplexFieldSpec(name, dataType, /* single value 
field */ true, childFieldSpecs));
       return this;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to