raghavyadav01 commented on code in PR #13906:
URL: https://github.com/apache/pinot/pull/13906#discussion_r1777897714


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -338,6 +339,14 @@ public String[] getStringArray(int rowId, int colId) {
     return strings;
   }
 
+  @Override
+  public Map<String, Object> getMap(int rowId, int colId) {
+    int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
+    ByteBuffer buffer = _variableSizeData.slice();
+    buffer.limit(size);
+    return MapUtils.deserializeMap(buffer);

Review Comment:
   How expensive will be deserializeMap? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java:
##########
@@ -816,6 +817,25 @@ public Object convert(Object value, PinotDataType 
sourceType) {
     }
   },
 
+  MAP {
+    @Override
+    public Object convert(Object value, PinotDataType sourceType) {
+      switch (sourceType) {
+        case OBJECT:

Review Comment:
   Are we adding Object Type as well? 



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ItemTransformFunction.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.operator.ColumnContext;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.MapDataSource;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Evaluates myMap['foo']
+ */
+public class ItemTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "item";

Review Comment:
   Why Name item? 



##########
pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java:
##########
@@ -674,6 +678,33 @@ public static byte[][] extractBytesColumn(DataType 
storedType, DataBlock dataBlo
     return values;
   }
 
+  public static Map[] extractMapColumn(DataType storedType, DataBlock 
dataBlock, int colId,
+      @Nullable RoaringBitmap nullBitmap) {
+    int numRows = dataBlock.getNumberOfRows();
+    Map[] values = new Map[numRows];
+    if (numRows == 0) {
+      return values;
+    }
+    if (storedType == DataType.UNKNOWN) {

Review Comment:
   Is Unknown type allowed during write? Should this be an exception? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/ImmutableMapDataSource.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.segment.index.map;
+
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@SuppressWarnings("rawtypes")
+public class ImmutableMapDataSource extends BaseMapDataSource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ImmutableMapDataSource.class);
+  private final MapIndexReader _mapIndexReader;
+
+  public ImmutableMapDataSource(ColumnMetadata columnMetadata, 
ColumnIndexContainer columnIndexContainer) {
+    super(new ImmutableMapDataSourceMetadata(columnMetadata), 
columnIndexContainer);
+    MapIndexReader mapIndexReader = getMapIndex();
+    if (mapIndexReader == null) {
+      // Fallback to use forward index
+      ForwardIndexReader<?> forwardIndex = getForwardIndex();
+      if (forwardIndex instanceof MapIndexReader) {
+        mapIndexReader = (MapIndexReader) forwardIndex;
+      } else {
+        mapIndexReader = new MapIndexReaderWrapper(forwardIndex, 
getFieldSpec());
+      }
+    }
+    _mapIndexReader = mapIndexReader;
+  }
+
+  @Override
+  public MapIndexReader<ForwardIndexReaderContext, IndexReader> 
getMapIndexReader() {
+    return _mapIndexReader;
+  }
+
+  @Override
+  public DataSourceMetadata getKeyDataSourceMetadata(String key) {
+    return null;
+  }
+
+  @Override
+  public ColumnIndexContainer getKeyIndexContainer(String key) {
+    return null;
+  }
+
+  private static class ImmutableMapDataSourceMetadata implements 
DataSourceMetadata {
+    final FieldSpec _fieldSpec;
+    final int _numDocs;
+    final int _numValues;
+    final int _maxNumValuesPerMVEntry;
+    final int _cardinality;
+    final PartitionFunction _partitionFunction;
+    final Set<Integer> _partitions;
+    final Comparable _minValue;
+    final Comparable _maxValue;
+
+    ImmutableMapDataSourceMetadata(ColumnMetadata columnMetadata) {
+      _fieldSpec = columnMetadata.getFieldSpec();
+      _numDocs = columnMetadata.getTotalDocs();
+      _numValues = columnMetadata.getTotalNumberOfEntries();
+      if (_fieldSpec.isSingleValueField()) {
+        _maxNumValuesPerMVEntry = -1;
+      } else {
+        _maxNumValuesPerMVEntry = columnMetadata.getMaxNumberOfMultiValues();

Review Comment:
   MultiValue MAP Column is not supported. What does this mean?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/OffHeapMapIndexCreator.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.map;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.pinot.segment.spi.index.creator.MapIndexCreator;
+import org.apache.pinot.spi.config.table.MapIndexConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class OffHeapMapIndexCreator implements MapIndexCreator {
+  public OffHeapMapIndexCreator(File indexDir, String name, MapIndexConfig 
indexConfig) {
+  }
+
+  @Override
+  public void add(Map<String, Object> mapValue) {
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {

Review Comment:
   Should this be true? 



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java:
##########
@@ -59,4 +61,16 @@ public BlockValSet getBlockValueSet(ExpressionContext 
expression) {
   public BlockValSet getBlockValueSet(String column) {
     return new ProjectionBlockValSet(_dataBlockCache, column, 
_dataSourceMap.get(column));
   }
+
+  @Override
+  public BlockValSet getBlockValueSet(String[] paths) {
+    // TODO: only support one level of path for now, e.g. `map.key`
+    assert paths.length == 2;

Review Comment:
   Do we need to assert or throw exception? 



##########
pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java:
##########
@@ -674,6 +678,33 @@ public static byte[][] extractBytesColumn(DataType 
storedType, DataBlock dataBlo
     return values;
   }
 
+  public static Map[] extractMapColumn(DataType storedType, DataBlock 
dataBlock, int colId,
+      @Nullable RoaringBitmap nullBitmap) {
+    int numRows = dataBlock.getNumberOfRows();

Review Comment:
   Can dataBlock be null here ? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java:
##########
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.MapUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * Extension of {@link AbstractColumnStatisticsCollector} for Map column type.
+ *
+ * The Map column type is different than other columns in that it is 
essentially recursive.  It contains keys
+ * and those keys are analogous to columns and, as such, have Key level 
statistics. So, this class keeps track of
+ * Map column level statistics _and_ Key level statistics.  The Key Level 
statistics can then be used during
+ * the creation of the Immutable Segment to make decisions about how keys will 
be stored or what Map data structure
+ * to use.
+ *
+ * Assumptions that are made:
+ * 1. Each key has a single type for the value's associated with it across all 
documents.
+ * 2. At this point in the  Pinot process, the type consistency of a key 
should already be enforced, so if a
+ * heterogenous value types for a key are encountered will constructing the 
Map statistics it can be raised as a
+ * fault.
+ */
+public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
+  private ObjectOpenHashSet<String> _keys = new 
ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
+  private final HashMap<String, AbstractColumnStatisticsCollector> _keyStats;
+  private String[] _sortedValues;
+  private int _minLength = Integer.MAX_VALUE;
+  private int _maxLength = 0;
+  private int _maxRowLength = 0;
+  private String _minValue = null;
+  private String _maxValue = null;
+  private boolean _sealed = false;
+  private final String _column;
+
+  public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
+    super(column, statsCollectorConfig);
+    super._sorted = false;
+    _keyStats = new HashMap<>();
+    _column = column;
+  }
+
+  public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
+    return _keyStats.get(key);
+  }
+
+  @Override
+  public void collect(Object entry) {
+    assert !_sealed;
+
+    if (entry instanceof Map) {
+      final Map<String, Object> mapValue = (Map<String, Object>) entry;
+      byte[] serializeMap = MapUtils.serializeMap(mapValue);
+
+      _maxRowLength = Math.max(_maxRowLength, serializeMap.length);
+      for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
+        final String key = mapValueEntry.getKey();
+
+        // Record statistics about the key
+        int length = serializeMap.length;
+        if (_keys.add(key)) {
+          if (isPartitionEnabled()) {

Review Comment:
   Can the table be partitioned on Map keys? 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java:
##########
@@ -59,9 +60,17 @@ public RelDataType createRelDataTypeFromSchema(Schema 
schema) {
   }
 
   private RelDataType toRelDataType(FieldSpec fieldSpec, boolean 
enableNullHandling) {
-    RelDataType type = createSqlType(getSqlTypeName(fieldSpec));
-    if (!fieldSpec.isSingleValueField()) {
-      type = createArrayType(type, -1);
+    SqlTypeName sqlTypeName = getSqlTypeName(fieldSpec);
+    RelDataType type;
+    if (sqlTypeName == SqlTypeName.MAP) {
+      ComplexFieldSpec.MapFieldSpec mapFieldSpec = 
ComplexFieldSpec.toMapFieldSpec((ComplexFieldSpec) fieldSpec);
+      type = 
createMapType(createSqlType(getSqlTypeName(mapFieldSpec.getKeyFieldSpec())),
+          createSqlType(getSqlTypeName(mapFieldSpec.getValueFieldSpec())));
+    } else {
+      type = createSqlType(sqlTypeName);
+      if (!fieldSpec.isSingleValueField()) {
+        type = createArrayType(type, -1);

Review Comment:
   What is -1 ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java:
##########
@@ -906,4 +937,26 @@ public static byte[][] extractBytesColumn(DataType 
storedType, DataBlock dataBlo
     }
     return values;
   }
+
+  public static Map[] extractMapColumn(DataType storedType, DataBlock 
dataBlock, int colId, int numMatchedRows,

Review Comment:
   For my understanding, what is the difference between these two 
extractMapColumn?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java:
##########
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.MapUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * Extension of {@link AbstractColumnStatisticsCollector} for Map column type.
+ *
+ * The Map column type is different than other columns in that it is 
essentially recursive.  It contains keys
+ * and those keys are analogous to columns and, as such, have Key level 
statistics. So, this class keeps track of
+ * Map column level statistics _and_ Key level statistics.  The Key Level 
statistics can then be used during
+ * the creation of the Immutable Segment to make decisions about how keys will 
be stored or what Map data structure
+ * to use.
+ *
+ * Assumptions that are made:
+ * 1. Each key has a single type for the value's associated with it across all 
documents.
+ * 2. At this point in the  Pinot process, the type consistency of a key 
should already be enforced, so if a
+ * heterogenous value types for a key are encountered will constructing the 
Map statistics it can be raised as a
+ * fault.
+ */
+public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
+  private ObjectOpenHashSet<String> _keys = new 
ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
+  private final HashMap<String, AbstractColumnStatisticsCollector> _keyStats;
+  private String[] _sortedValues;
+  private int _minLength = Integer.MAX_VALUE;
+  private int _maxLength = 0;
+  private int _maxRowLength = 0;
+  private String _minValue = null;
+  private String _maxValue = null;
+  private boolean _sealed = false;
+  private final String _column;
+
+  public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
+    super(column, statsCollectorConfig);
+    super._sorted = false;
+    _keyStats = new HashMap<>();
+    _column = column;
+  }
+
+  public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
+    return _keyStats.get(key);
+  }
+
+  @Override
+  public void collect(Object entry) {
+    assert !_sealed;
+
+    if (entry instanceof Map) {
+      final Map<String, Object> mapValue = (Map<String, Object>) entry;
+      byte[] serializeMap = MapUtils.serializeMap(mapValue);
+
+      _maxRowLength = Math.max(_maxRowLength, serializeMap.length);
+      for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
+        final String key = mapValueEntry.getKey();
+
+        // Record statistics about the key
+        int length = serializeMap.length;
+        if (_keys.add(key)) {
+          if (isPartitionEnabled()) {
+            updatePartition(key);
+          }
+          if (_minValue == null) {
+            _minValue = key;
+          } else {
+            if (key.compareTo(_minValue) < 0) {
+              _minValue = key;
+            }
+          }
+          if (_maxValue == null) {
+            _maxValue = key;
+          } else {
+            if (key.compareTo(_maxValue) > 0) {
+              _maxValue = key;
+            }
+          }
+          _minLength = Math.min(_minLength, length);
+          _maxLength = Math.max(_maxLength, length);
+        }
+
+        // Record statistics about the value within the key
+        AbstractColumnStatisticsCollector keyStats = 
getOrCreateKeyStatsCollector(key, mapValueEntry.getValue());
+        keyStats.collect(mapValueEntry.getValue());
+      }
+      _totalNumberOfEntries++;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public String getMinValue() {
+    if (_sealed) {
+      return _minValue;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for min value");
+  }
+
+  @Override
+  public String getMaxValue() {
+    if (_sealed) {
+      return _maxValue;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for max value");
+  }
+
+  @Override
+  public String[] getUniqueValuesSet() {
+    if (_sealed) {
+      return _sortedValues;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for unique values set");
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _minLength;
+  }
+
+  @Override
+  public int getLengthOfLargestElement() {
+    if (_sealed) {
+      return _maxLength;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for longest value");
+  }
+
+  @Override
+  public int getMaxRowLengthInBytes() {
+    return _maxRowLength;
+  }
+
+  @Override
+  public int getCardinality() {
+    if (_sealed) {
+      return _sortedValues.length;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+  }
+
+  @Override
+  public void seal() {
+    if (!_sealed) {
+      _sortedValues = _keys.stream().sorted().toArray(String[]::new);
+      _keys = null;
+
+      // Iterate through every key stats collector and seal them
+      for (AbstractColumnStatisticsCollector keyStatsCollector : 
_keyStats.values()) {
+        keyStatsCollector.seal();
+      }
+
+      _sealed = true;
+    }
+  }
+
+  /**
+   * Create a Stats Collector for each Key in the Map.
+   *
+   * NOTE: this could raise an issue if there are millions of keys with very 
few values (Sparse keys, in other words).

Review Comment:
   What are the limits for the Map? 



##########
pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java:
##########
@@ -674,6 +678,33 @@ public static byte[][] extractBytesColumn(DataType 
storedType, DataBlock dataBlo
     return values;
   }
 
+  public static Map[] extractMapColumn(DataType storedType, DataBlock 
dataBlock, int colId,
+      @Nullable RoaringBitmap nullBitmap) {
+    int numRows = dataBlock.getNumberOfRows();
+    Map[] values = new Map[numRows];
+    if (numRows == 0) {
+      return values;
+    }
+    if (storedType == DataType.UNKNOWN) {
+      Arrays.fill(values, null);
+      return values;
+    }
+    Preconditions.checkState(storedType == DataType.BYTES,
+        "Cannot extract byte[] values for column: %s with stored type: %s",
+        dataBlock.getDataSchema().getColumnName(colId), storedType);
+    if (nullBitmap == null) {
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        values[rowId] = MapUtils.deserializeMap(dataBlock.getBytes(rowId, 
colId).getBytes());
+      }
+    } else {
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        values[rowId] =
+            !nullBitmap.contains(rowId) ? 
MapUtils.deserializeMap(dataBlock.getBytes(rowId, colId).getBytes()) : null;

Review Comment:
   Why two getBytes? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java:
##########
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.MapUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * Extension of {@link AbstractColumnStatisticsCollector} for Map column type.
+ *
+ * The Map column type is different than other columns in that it is 
essentially recursive.  It contains keys
+ * and those keys are analogous to columns and, as such, have Key level 
statistics. So, this class keeps track of
+ * Map column level statistics _and_ Key level statistics.  The Key Level 
statistics can then be used during
+ * the creation of the Immutable Segment to make decisions about how keys will 
be stored or what Map data structure
+ * to use.
+ *
+ * Assumptions that are made:
+ * 1. Each key has a single type for the value's associated with it across all 
documents.
+ * 2. At this point in the  Pinot process, the type consistency of a key 
should already be enforced, so if a
+ * heterogenous value types for a key are encountered will constructing the 
Map statistics it can be raised as a
+ * fault.
+ */
+public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
+  private ObjectOpenHashSet<String> _keys = new 
ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
+  private final HashMap<String, AbstractColumnStatisticsCollector> _keyStats;
+  private String[] _sortedValues;
+  private int _minLength = Integer.MAX_VALUE;
+  private int _maxLength = 0;
+  private int _maxRowLength = 0;
+  private String _minValue = null;
+  private String _maxValue = null;
+  private boolean _sealed = false;
+  private final String _column;
+
+  public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
+    super(column, statsCollectorConfig);
+    super._sorted = false;
+    _keyStats = new HashMap<>();
+    _column = column;
+  }
+
+  public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
+    return _keyStats.get(key);
+  }
+
+  @Override
+  public void collect(Object entry) {
+    assert !_sealed;
+
+    if (entry instanceof Map) {
+      final Map<String, Object> mapValue = (Map<String, Object>) entry;
+      byte[] serializeMap = MapUtils.serializeMap(mapValue);
+
+      _maxRowLength = Math.max(_maxRowLength, serializeMap.length);
+      for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
+        final String key = mapValueEntry.getKey();
+
+        // Record statistics about the key
+        int length = serializeMap.length;
+        if (_keys.add(key)) {
+          if (isPartitionEnabled()) {
+            updatePartition(key);
+          }
+          if (_minValue == null) {
+            _minValue = key;
+          } else {
+            if (key.compareTo(_minValue) < 0) {
+              _minValue = key;
+            }
+          }
+          if (_maxValue == null) {
+            _maxValue = key;
+          } else {
+            if (key.compareTo(_maxValue) > 0) {
+              _maxValue = key;
+            }
+          }
+          _minLength = Math.min(_minLength, length);
+          _maxLength = Math.max(_maxLength, length);
+        }
+
+        // Record statistics about the value within the key
+        AbstractColumnStatisticsCollector keyStats = 
getOrCreateKeyStatsCollector(key, mapValueEntry.getValue());
+        keyStats.collect(mapValueEntry.getValue());
+      }
+      _totalNumberOfEntries++;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public String getMinValue() {
+    if (_sealed) {
+      return _minValue;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for min value");
+  }
+
+  @Override
+  public String getMaxValue() {
+    if (_sealed) {
+      return _maxValue;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for max value");
+  }
+
+  @Override
+  public String[] getUniqueValuesSet() {
+    if (_sealed) {
+      return _sortedValues;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for unique values set");
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _minLength;
+  }
+
+  @Override
+  public int getLengthOfLargestElement() {
+    if (_sealed) {
+      return _maxLength;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for longest value");
+  }
+
+  @Override
+  public int getMaxRowLengthInBytes() {
+    return _maxRowLength;
+  }
+
+  @Override
+  public int getCardinality() {
+    if (_sealed) {
+      return _sortedValues.length;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for cardinality");
+  }
+
+  @Override
+  public void seal() {
+    if (!_sealed) {
+      _sortedValues = _keys.stream().sorted().toArray(String[]::new);
+      _keys = null;
+
+      // Iterate through every key stats collector and seal them
+      for (AbstractColumnStatisticsCollector keyStatsCollector : 
_keyStats.values()) {
+        keyStatsCollector.seal();
+      }
+
+      _sealed = true;
+    }
+  }
+
+  /**
+   * Create a Stats Collector for each Key in the Map.
+   *
+   * NOTE: this could raise an issue if there are millions of keys with very 
few values (Sparse keys, in other words).
+   * So a less memory intensive option may be better for this.
+   *
+   * @param key
+   * @param value
+   * @return
+   */
+  private AbstractColumnStatisticsCollector 
getOrCreateKeyStatsCollector(String key, Object value) {

Review Comment:
   How expensive is this Stats Collector specially if large number of keys are 
there in MAP? 



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java:
##########
@@ -449,6 +450,19 @@ default byte[] getBytes(int docId, T context) {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the MAP type single-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return MAP type single-value at the given document id
+   */
+  default Map<String, Object> getMap(int docId, T context) {

Review Comment:
   What indexes are supported on MAP type? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/MapUtils.java:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+public class MapUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MapUtils.class);
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private MapUtils() {
+  }
+
+  public static byte[] serializeMap(Map<String, Object> map) {
+    int size = map.size();
+
+    // Directly return the size (0) for empty map
+    if (size == 0) {
+      return new byte[Integer.BYTES];
+    }
+
+    // Besides the value bytes, we store: size, length for each key, length 
for each value
+    long bufferSize = (1 + 2 * (long) size) * Integer.BYTES;
+    byte[][] keyBytesArray = new byte[size][];

Review Comment:
   for very large map will these become performance bottlenecks? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexReaderWrapper.java:
##########
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.map;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class MapIndexReaderWrapper implements 
org.apache.pinot.segment.spi.index.reader.MapIndexReader {
+  private final ForwardIndexReader _forwardIndexReader;
+  private final ComplexFieldSpec.MapFieldSpec _mapFieldSpec;
+
+  public MapIndexReaderWrapper(ForwardIndexReader forwardIndexReader, 
ComplexFieldSpec.MapFieldSpec mapFieldSpec) {
+    _forwardIndexReader = forwardIndexReader;
+    _mapFieldSpec = mapFieldSpec;
+  }
+
+  @Override
+  public Set<String> getKeys() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<IndexType, IndexReader> getKeyIndexes(String key) {
+    return Map.of(StandardIndexes.forward(), new 
MapKeyIndexReader(_forwardIndexReader, key, getKeyFieldSpec(key)));
+  }
+
+  @Override
+  public FieldSpec getKeyFieldSpec(String key) {
+    return _mapFieldSpec.getValueFieldSpec();
+  }
+
+  @Override
+  public FieldSpec.DataType getKeyStoredType(String key) {
+    return _mapFieldSpec.getValueFieldSpec().getDataType();
+  }
+
+  @Override
+  public ColumnMetadata getKeyMetadata(String key) {

Review Comment:
   None of these methods return any metadata value. Is that expected? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to