This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ee7a52f48 Changes to Enable Pluggable forward Index and Map Index 
Filter Operator for MAP column (#15526)
0ee7a52f48 is described below

commit 0ee7a52f48585787a8a1d845ae3217193448d3d1
Author: RAGHVENDRA KUMAR YADAV <raghavmn...@gmail.com>
AuthorDate: Thu May 22 16:47:47 2025 -0700

    Changes to Enable Pluggable forward Index and Map Index Filter Operator for 
MAP column (#15526)
---
 .../apache/pinot/common/utils/PinotDataType.java   |   6 +
 .../operator/filter/JsonMatchFilterOperator.java   |  36 +++-
 .../core/operator/filter/MapFilterOperator.java    | 215 ++++++++++++++++++
 .../transform/function/ItemTransformFunction.java  |   2 +-
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  12 ++
 .../src/test/resources/TableIndexingTest.csv       |  20 +-
 .../integration/tests/custom/MapFieldTypeTest.java | 240 +++++++++++++++++++--
 .../RealtimeSegmentSegmentCreationDataSource.java  |   2 +-
 .../stats/RealtimeSegmentStatsContainer.java       |  24 ++-
 .../realtime/impl/json/MutableJsonIndexImpl.java   |   9 +
 .../stats/MapColumnPreIndexStatsCollector.java     |  16 ++
 .../segment/index/datasource/BaseDataSource.java   |   7 -
 .../local/segment/index/json/JsonIndexType.java    |   6 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   2 +-
 .../segment/index/map/BaseMapIndexCreator.java     |  29 ---
 .../segment/index/map/ImmutableMapDataSource.java  |  15 +-
 .../local/segment/index/map/MapIndexHandler.java   |  48 -----
 .../local/segment/index/map/MapIndexPlugin.java    |  34 ---
 .../local/segment/index/map/MapIndexType.java      | 185 ----------------
 .../segment/index/map/MutableMapDataSource.java    |  16 +-
 .../segment/index/map/MutableMapIndexImpl.java     |  45 ----
 .../local/segment/index/map/NullDataSource.java    |   7 -
 .../readers/json/ImmutableJsonIndexReader.java     |   8 +
 .../index/readers/map/ImmutableMapIndexReader.java | 220 -------------------
 .../org/apache/pinot/segment/spi/V1Constants.java  |   2 -
 .../pinot/segment/spi/datasource/DataSource.java   |   7 -
 .../segment/spi/index/ForwardIndexConfig.java      |  25 ++-
 .../pinot/segment/spi/index/StandardIndexes.java   |   9 -
 .../spi/index/creator/JsonIndexCreator.java        |   8 +-
 .../segment/spi/index/creator/MapIndexCreator.java |  44 ----
 .../spi/index/mutable/MutableJsonIndex.java        |   8 +-
 .../segment/spi/index/mutable/MutableMapIndex.java |  70 ------
 .../segment/spi/index/reader/JsonIndexReader.java  |   6 +-
 .../pinot/spi/config/table/IndexingConfig.java     |  24 ---
 .../pinot/spi/config/table/MapIndexConfig.java     |  81 -------
 35 files changed, 601 insertions(+), 887 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index b0b19f255d..b58373dab0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -821,6 +821,12 @@ public enum PinotDataType {
     @Override
     public Object convert(Object value, PinotDataType sourceType) {
       switch (sourceType) {
+        case STRING:
+          try {
+            return JsonUtils.stringToObject(value.toString(), Map.class);
+          } catch (Exception e) {
+            throw new RuntimeException("Unable to convert String to Map. Input 
value: " + value, e);
+          }
         case OBJECT:
         case MAP:
           if (value instanceof Map) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
index 7a8eb8f83c..ea6986556e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.filter;
 import com.google.common.base.CaseFormat;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.Operator;
@@ -41,18 +42,31 @@ public class JsonMatchFilterOperator extends 
BaseFilterOperator {
 
   private final JsonIndexReader _jsonIndex;
   private final JsonMatchPredicate _predicate;
+  private final FilterContext _filterContext;
 
-  public JsonMatchFilterOperator(JsonIndexReader jsonIndex, JsonMatchPredicate 
predicate,
-      int numDocs) {
+  /**
+   * Constructor that takes a Json Predicate
+   */
+  public JsonMatchFilterOperator(JsonIndexReader jsonIndex, JsonMatchPredicate 
predicate, int numDocs) {
     super(numDocs, false);
     _jsonIndex = jsonIndex;
     _predicate = predicate;
+    _filterContext = null;
+  }
+
+  /**
+   * Constructor that takes a FilterContext
+   */
+  public JsonMatchFilterOperator(JsonIndexReader jsonIndex, FilterContext 
filterContext, int numDocs) {
+    super(numDocs, false);
+    _jsonIndex = jsonIndex;
+    _filterContext = filterContext;
+    _predicate = null;
   }
 
   @Override
   protected BlockDocIdSet getTrues() {
-    ImmutableRoaringBitmap bitmap =
-        _jsonIndex.getMatchingDocIds(_predicate.getValue(), 
_predicate.getCountPredicate());
+    ImmutableRoaringBitmap bitmap = getMatchingDocIdBitmap();
     record(bitmap);
     return new BitmapDocIdSet(bitmap, _numDocs);
   }
@@ -64,7 +78,7 @@ public class JsonMatchFilterOperator extends 
BaseFilterOperator {
 
   @Override
   public int getNumMatchingDocs() {
-    return _jsonIndex.getMatchingDocIds(_predicate.getValue(), 
_predicate.getCountPredicate()).getCardinality();
+    return getMatchingDocIdBitmap().getCardinality();
   }
 
   @Override
@@ -74,8 +88,8 @@ public class JsonMatchFilterOperator extends 
BaseFilterOperator {
 
   @Override
   public BitmapCollection getBitmaps() {
-    return new BitmapCollection(_numDocs, false,
-        _jsonIndex.getMatchingDocIds(_predicate.getValue(), 
_predicate.getCountPredicate()));
+    ImmutableRoaringBitmap bitmap = getMatchingDocIdBitmap();
+    return new BitmapCollection(_numDocs, false, bitmap);
   }
 
   @Override
@@ -112,4 +126,12 @@ public class JsonMatchFilterOperator extends 
BaseFilterOperator {
       recording.setNumDocsMatchingAfterFilter(bitmap.getCardinality());
     }
   }
+
+  private ImmutableRoaringBitmap getMatchingDocIdBitmap() {
+    if (_predicate != null) {
+      return _jsonIndex.getMatchingDocIds(_predicate.getValue(), 
_predicate.getCountPredicate());
+    } else {
+      return _jsonIndex.getMatchingDocIds(_filterContext);
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
new file mode 100644
index 0000000000..ee413306b3
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.CaseFormat;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.common.request.context.predicate.NotInPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+
+
+/**
+ * Filter operator for Map matching that internally uses 
JsonMatchFilterOperator or ExpressionFilterOperator.
+ * This operator converts map predicates to JSON predicates and delegates 
filtering operations
+ * to JsonMatchFilterOperator.
+ */
+public class MapFilterOperator extends BaseFilterOperator {
+  private static final String EXPLAIN_NAME = "FILTER_MAP";
+
+  private final JsonMatchFilterOperator _jsonMatchOperator;
+  private final ExpressionFilterOperator _expressionFilterOperator;
+  private final String _columnName;
+  private final String _keyName;
+  private final Predicate _predicate;
+
+  public MapFilterOperator(IndexSegment indexSegment, Predicate predicate, 
QueryContext queryContext,
+      int numDocs) {
+    super(numDocs, false);
+    _predicate = predicate;
+
+    // Get column name and key name from function arguments
+    List<ExpressionContext> arguments = 
predicate.getLhs().getFunction().getArguments();
+    if (arguments.size() != 2) {
+      throw new IllegalStateException("Expected two arguments (column name and 
key name), found: " + arguments.size());
+    }
+
+    _columnName = arguments.get(0).getIdentifier();
+    _keyName = arguments.get(1).getLiteral().getStringValue();
+
+    // Get JSON index and create operator
+    DataSource dataSource = indexSegment.getDataSource(_columnName);
+    JsonIndexReader jsonIndex = dataSource.getJsonIndex();
+    if (jsonIndex != null && useJsonIndex(_predicate.getType())) {
+      FilterContext filterContext = createFilterContext();
+      _jsonMatchOperator = new JsonMatchFilterOperator(jsonIndex, 
filterContext, numDocs);
+      _expressionFilterOperator = null;
+    } else {
+      _jsonMatchOperator = null;
+      _expressionFilterOperator = new ExpressionFilterOperator(indexSegment, 
queryContext, predicate, numDocs);
+    }
+  }
+
+  /**
+   * Creates a FilterContext based on the original predicate type
+   */
+  private FilterContext createFilterContext() {
+    // Create identifier expression for the JSON column
+    ExpressionContext keyLhs = ExpressionContext.forIdentifier(_keyName);
+
+    // Create predicate based on type
+    Predicate predicate;
+    switch (_predicate.getType()) {
+      case EQ:
+        predicate = new EqPredicate(keyLhs, ((EqPredicate) 
_predicate).getValue());
+        break;
+      case NOT_EQ:
+        predicate = new NotEqPredicate(keyLhs, ((NotEqPredicate) 
_predicate).getValue());
+        break;
+      case IN:
+        predicate = new InPredicate(keyLhs, ((InPredicate) 
_predicate).getValues());
+        break;
+      case NOT_IN:
+        predicate = new NotInPredicate(keyLhs, ((NotInPredicate) 
_predicate).getValues());
+        break;
+      default:
+        throw new IllegalStateException(
+            "Unsupported predicate type for creating filter context: " + 
_predicate.getType());
+    }
+
+    return FilterContext.forPredicate(predicate);
+  }
+
+  @Override
+  protected BlockDocIdSet getTrues() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.getTrues();
+    } else {
+      return _expressionFilterOperator.getTrues();
+    }
+  }
+
+  @Override
+  public boolean canOptimizeCount() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.canOptimizeCount();
+    } else {
+      return _expressionFilterOperator.canOptimizeCount();
+    }
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.getNumMatchingDocs();
+    } else {
+      return _expressionFilterOperator.getNumMatchingDocs();
+    }
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.canProduceBitmaps();
+    } else {
+      return _expressionFilterOperator.canProduceBitmaps();
+    }
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.getBitmaps();
+    } else {
+      return _expressionFilterOperator.getBitmaps();
+    }
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public String toExplainString() {
+    StringBuilder stringBuilder =
+        new 
StringBuilder(EXPLAIN_NAME).append("(column:").append(_columnName).append(",key:").append(_keyName)
+            
.append(",indexLookUp:map_index").append(",operator:").append(_predicate.getType()).append(",predicate:")
+            .append(_predicate);
+
+    if (_jsonMatchOperator != null) {
+      stringBuilder.append(",delegateTo:json_match");
+    } else {
+      stringBuilder.append(",delegateTo:expression_filter");
+    }
+
+    return stringBuilder.append(')').toString();
+  }
+
+  @Override
+  protected String getExplainName() {
+    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, 
EXPLAIN_NAME);
+  }
+
+  @Override
+  protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) {
+    super.explainAttributes(attributeBuilder);
+    attributeBuilder.putString("column", _columnName);
+    attributeBuilder.putString("key", _keyName);
+    attributeBuilder.putString("indexLookUp", "map_index");
+    attributeBuilder.putString("operator", _predicate.getType().name());
+    attributeBuilder.putString("predicate", _predicate.toString());
+
+    if (_jsonMatchOperator != null) {
+      attributeBuilder.putString("delegateTo", "json_match");
+    } else {
+      attributeBuilder.putString("delegateTo", "expression_filter");
+    }
+  }
+
+  /**
+   * Determines whether to use JSON index for the given predicate type.
+   *
+   * @param predicateType The type of predicate
+   * @return true if the predicate type is supported for JSON index, false 
otherwise
+   */
+  private boolean useJsonIndex(Predicate.Type predicateType) {
+    switch (predicateType) {
+      case EQ:
+      case NOT_EQ:
+      case IN:
+      case NOT_IN:
+        return true;
+      default:
+        return false;
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ItemTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ItemTransformFunction.java
index 1aada4da54..cfb80760ee 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ItemTransformFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ItemTransformFunction.java
@@ -105,7 +105,7 @@ public class ItemTransformFunction extends 
BaseTransformFunction {
 
   @Override
   public int[] transformToDictIdsSV(ValueBlock valueBlock) {
-    return transformToIntValuesSV(valueBlock);
+    return valueBlock.getBlockValueSet(_keyPath).getDictionaryIdsSV();
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 1c7a5f95b0..73f148c9c4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -43,6 +43,7 @@ import 
org.apache.pinot.core.operator.filter.FilterOperatorUtils;
 import org.apache.pinot.core.operator.filter.H3InclusionIndexFilterOperator;
 import org.apache.pinot.core.operator.filter.H3IndexFilterOperator;
 import org.apache.pinot.core.operator.filter.JsonMatchFilterOperator;
+import org.apache.pinot.core.operator.filter.MapFilterOperator;
 import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
 import org.apache.pinot.core.operator.filter.TextContainsFilterOperator;
 import org.apache.pinot.core.operator.filter.TextMatchFilterOperator;
@@ -50,6 +51,7 @@ import 
org.apache.pinot.core.operator.filter.VectorSimilarityFilterOperator;
 import 
org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEvaluatorFactory;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
+import org.apache.pinot.core.operator.transform.function.ItemTransformFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex;
 import 
org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader;
@@ -192,6 +194,14 @@ public class FilterPlanNode implements PlanNode {
     }
   }
 
+  private boolean canApplyMapFilter(Predicate predicate) {
+    // Get column name and key name from function arguments
+    FunctionContext function = predicate.getLhs().getFunction();
+
+    // Check if the function is an ItemTransformFunction
+    return 
function.getFunctionName().equals(ItemTransformFunction.FUNCTION_NAME);
+  }
+
   /**
    * Helper method to build the operator tree from the filter.
    */
@@ -238,6 +248,8 @@ public class FilterPlanNode implements PlanNode {
             return new H3IndexFilterOperator(_indexSegment, _queryContext, 
predicate, numDocs);
           } else if (canApplyH3IndexForInclusionCheck(predicate, 
lhs.getFunction())) {
             return new H3InclusionIndexFilterOperator(_indexSegment, 
_queryContext, predicate, numDocs);
+          } else if (canApplyMapFilter(predicate)) {
+            return new MapFilterOperator(_indexSegment, predicate, 
_queryContext, numDocs);
           } else {
             // TODO: ExpressionFilterOperator does not support predicate types 
without PredicateEvaluator (TEXT_MATCH)
             return new ExpressionFilterOperator(_indexSegment, _queryContext, 
predicate, numDocs);
diff --git a/pinot-core/src/test/resources/TableIndexingTest.csv 
b/pinot-core/src/test/resources/TableIndexingTest.csv
index 9ad8a25c30..8214617c93 100644
--- a/pinot-core/src/test/resources/TableIndexingTest.csv
+++ b/pinot-core/src/test/resources/TableIndexingTest.csv
@@ -400,7 +400,7 @@ STRING;map;raw;bloom_filter;true;
 STRING;map;raw;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 STRING;map;raw;h3_index;false;H3 index is currently only supported on BYTES 
columns
 STRING;map;raw;inverted_index;false;Cannot create inverted index for raw index 
column: col
-STRING;map;raw;json_index;false;Json index is currently only supported on 
STRING columns
+STRING;map;raw;json_index;true;
 STRING;map;raw;native_text_index;false;Cannot create text index on column: 
col, it can only be applied to string columns
 STRING;map;raw;text_index;false;Cannot create text index on column: col, it 
can only be applied to string columns
 STRING;map;raw;range_index;false;Unsupported data type: MAP
@@ -411,7 +411,7 @@ STRING;map;dict;bloom_filter;true;
 STRING;map;dict;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 STRING;map;dict;h3_index;false;H3 index is currently only supported on BYTES 
columns
 STRING;map;dict;inverted_index;false;Cannot create inverted index for raw 
index column: col
-STRING;map;dict;json_index;false;Json index is currently only supported on 
STRING columns
+STRING;map;dict;json_index;true;
 STRING;map;dict;native_text_index;false;Cannot create text index on column: 
col, it can only be applied to string columns
 STRING;map;dict;text_index;false;Cannot create text index on column: col, it 
can only be applied to string columns
 STRING;map;dict;range_index;false;Unsupported data type: MAP
@@ -422,7 +422,7 @@ INT;map;raw;bloom_filter;true;
 INT;map;raw;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 INT;map;raw;h3_index;false;H3 index is currently only supported on BYTES 
columns
 INT;map;raw;inverted_index;false;Cannot create inverted index for raw index 
column: col
-INT;map;raw;json_index;false;Json index is currently only supported on STRING 
columns
+INT;map;raw;json_index;true;
 INT;map;raw;native_text_index;false;Cannot create text index on column: col, 
it can only be applied to string columns
 INT;map;raw;text_index;false;Cannot create text index on column: col, it can 
only be applied to string columns
 INT;map;raw;range_index;false;Unsupported data type: MAP
@@ -433,7 +433,7 @@ INT;map;dict;bloom_filter;true;
 INT;map;dict;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 INT;map;dict;h3_index;false;H3 index is currently only supported on BYTES 
columns
 INT;map;dict;inverted_index;false;Cannot create inverted index for raw index 
column: col
-INT;map;dict;json_index;false;Json index is currently only supported on STRING 
columns
+INT;map;dict;json_index;true;
 INT;map;dict;native_text_index;false;Cannot create text index on column: col, 
it can only be applied to string columns
 INT;map;dict;text_index;false;Cannot create text index on column: col, it can 
only be applied to string columns
 INT;map;dict;range_index;false;Unsupported data type: MAP
@@ -444,7 +444,7 @@ LONG;map;raw;bloom_filter;true;
 LONG;map;raw;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 LONG;map;raw;h3_index;false;H3 index is currently only supported on BYTES 
columns
 LONG;map;raw;inverted_index;false;Cannot create inverted index for raw index 
column: col
-LONG;map;raw;json_index;false;Json index is currently only supported on STRING 
columns
+LONG;map;raw;json_index;true;
 LONG;map;raw;native_text_index;false;Cannot create text index on column: col, 
it can only be applied to string columns
 LONG;map;raw;text_index;false;Cannot create text index on column: col, it can 
only be applied to string columns
 LONG;map;raw;range_index;false;Unsupported data type: MAP
@@ -455,7 +455,7 @@ LONG;map;dict;bloom_filter;true;
 LONG;map;dict;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 LONG;map;dict;h3_index;false;H3 index is currently only supported on BYTES 
columns
 LONG;map;dict;inverted_index;false;Cannot create inverted index for raw index 
column: col
-LONG;map;dict;json_index;false;Json index is currently only supported on 
STRING columns
+LONG;map;dict;json_index;true;
 LONG;map;dict;native_text_index;false;Cannot create text index on column: col, 
it can only be applied to string columns
 LONG;map;dict;text_index;false;Cannot create text index on column: col, it can 
only be applied to string columns
 LONG;map;dict;range_index;false;Unsupported data type: MAP
@@ -466,7 +466,7 @@ FLOAT;map;raw;bloom_filter;true;
 FLOAT;map;raw;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 FLOAT;map;raw;h3_index;false;H3 index is currently only supported on BYTES 
columns
 FLOAT;map;raw;inverted_index;false;Cannot create inverted index for raw index 
column: col
-FLOAT;map;raw;json_index;false;Json index is currently only supported on 
STRING columns
+FLOAT;map;raw;json_index;true;
 FLOAT;map;raw;native_text_index;false;Cannot create text index on column: col, 
it can only be applied to string columns
 FLOAT;map;raw;text_index;false;Cannot create text index on column: col, it can 
only be applied to string columns
 FLOAT;map;raw;range_index;false;Unsupported data type: MAP
@@ -477,7 +477,7 @@ FLOAT;map;dict;bloom_filter;true;
 FLOAT;map;dict;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 FLOAT;map;dict;h3_index;false;H3 index is currently only supported on BYTES 
columns
 FLOAT;map;dict;inverted_index;false;Cannot create inverted index for raw index 
column: col
-FLOAT;map;dict;json_index;false;Json index is currently only supported on 
STRING columns
+FLOAT;map;dict;json_index;true;
 FLOAT;map;dict;native_text_index;false;Cannot create text index on column: 
col, it can only be applied to string columns
 FLOAT;map;dict;text_index;false;Cannot create text index on column: col, it 
can only be applied to string columns
 FLOAT;map;dict;range_index;false;Unsupported data type: MAP
@@ -488,7 +488,7 @@ DOUBLE;map;raw;bloom_filter;true;
 DOUBLE;map;raw;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 DOUBLE;map;raw;h3_index;false;H3 index is currently only supported on BYTES 
columns
 DOUBLE;map;raw;inverted_index;false;Cannot create inverted index for raw index 
column: col
-DOUBLE;map;raw;json_index;false;Json index is currently only supported on 
STRING columns
+DOUBLE;map;raw;json_index;true;
 DOUBLE;map;raw;native_text_index;false;Cannot create text index on column: 
col, it can only be applied to string columns
 DOUBLE;map;raw;text_index;false;Cannot create text index on column: col, it 
can only be applied to string columns
 DOUBLE;map;raw;range_index;false;Unsupported data type: MAP
@@ -499,7 +499,7 @@ DOUBLE;map;dict;bloom_filter;true;
 DOUBLE;map;dict;fst_index;false;Cannot create FST index on column: col, it can 
only be applied to dictionary encoded single value string columns
 DOUBLE;map;dict;h3_index;false;H3 index is currently only supported on BYTES 
columns
 DOUBLE;map;dict;inverted_index;false;Cannot create inverted index for raw 
index column: col
-DOUBLE;map;dict;json_index;false;Json index is currently only supported on 
STRING columns
+DOUBLE;map;dict;json_index;true;
 DOUBLE;map;dict;native_text_index;false;Cannot create text index on column: 
col, it can only be applied to string columns
 DOUBLE;map;dict;text_index;false;Cannot create text index on column: col, it 
can only be applied to string columns
 DOUBLE;map;dict;range_index;false;Unsupported data type: MAP
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
index e906c5b865..2974d20482 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapFieldTypeTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.integration.tests.custom;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,9 +28,9 @@ import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -38,12 +39,14 @@ import 
org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 
 
 @Test(suiteName = "CustomClusterIntegrationTest")
 public class MapFieldTypeTest extends CustomDataQueryClusterIntegrationTest {
 
   // Default settings
+  private static final int V1_DEFAULT_SELECTION_COUNT = 10;
   protected static final String DEFAULT_TABLE_NAME = "MapFieldTypeTest";
   private static final int NUM_DOCS = 1000;
   private static final String STRING_MAP_FIELD_NAME = "stringMap";
@@ -84,9 +87,45 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
 
   @Override
   public TableConfig createOfflineTableConfig() {
-    IngestionConfig ingestionConfig = new IngestionConfig();
-    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setIngestionConfig(ingestionConfig)
-        .build();
+    // Create table config with field configs
+    TableConfig config =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setFieldConfigList(createFieldConfigs())
+            .build();
+    return config;
+  }
+
+  private List<FieldConfig> createFieldConfigs() {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    // Create the combined indexes map for STRING_MAP_FIELD_NAME
+    Map<String, Object> stringIndexes = new HashMap<>();
+
+    // Add JSON index with configs at root level
+    Map<String, Object> stringJson = new HashMap<>();
+    stringJson.put("maxLevels", 1);
+    stringJson.put("excludeArray", false);
+    stringJson.put("disableCrossArrayUnnest", true);
+    stringIndexes.put("json", stringJson);
+
+    JsonNode stringIndexesJson = objectMapper.valueToTree(stringIndexes);
+
+    // Create the combined indexes map for INT_MAP_FIELD_NAME
+    Map<String, Object> intIndexes = new HashMap<>();
+
+    // Add JSON index for INT_MAP_FIELD_NAME
+    Map<String, Object> intJson = new HashMap<>();
+    intJson.put("maxLevels", 1);
+    intJson.put("excludeArray", false);
+    intJson.put("disableCrossArrayUnnest", true);
+    intIndexes.put("json", intJson);
+
+    JsonNode intIndexesJson = objectMapper.valueToTree(intIndexes);
+
+    FieldConfig stringMapFieldConfig =
+        new 
FieldConfig.Builder(STRING_MAP_FIELD_NAME).withIndexes(stringIndexesJson).build();
+
+    FieldConfig intMapFieldConfig = new 
FieldConfig.Builder(INT_MAP_FIELD_NAME).withIndexes(intIndexesJson).build();
+    return Arrays.asList(stringMapFieldConfig, intMapFieldConfig);
   }
 
   public List<File> createAvroFiles()
@@ -110,7 +149,7 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
         Map<String, Integer> intMap = new HashMap<>();
         for (int j = 0; j < i; j++) {
           String key = "k" + j;
-          stringMap.put(key, String.valueOf(i));
+          stringMap.put(key, "v" + i);
           intMap.put(key, i);
         }
         GenericData.Record record = new GenericData.Record(avroSchema);
@@ -139,11 +178,9 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     for (int i = 0; i < getSelectionDefaultDocCount(); i++) {
       JsonNode intMap = rows.get(i).get(0);
       JsonNode stringMap = rows.get(i).get(1);
-      assertEquals(intMap.size(), i);
-      assertEquals(stringMap.size(), i);
       for (int j = 0; j < i; j++) {
         assertEquals(intMap.get("k" + j).intValue(), i);
-        assertEquals(stringMap.get("k" + j).textValue(), String.valueOf(i));
+        assertEquals(stringMap.get("k" + j).textValue(), "v" + i);
       }
     }
     // Selection only
@@ -156,7 +193,7 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(rows.get(0).get(0).textValue(), "null");
     assertEquals(rows.get(0).get(1).intValue(), -2147483648);
     for (int i = 1; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).textValue(), String.valueOf(i));
+      assertEquals(rows.get(i).get(0).textValue(), "v" + i);
       assertEquals(rows.get(i).get(1).intValue(), i);
     }
 
@@ -171,16 +208,14 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(rows.get(0).get(0).intValue(), -2147483648);
     assertEquals(rows.get(0).get(1).intValue(), -2147483648);
     assertEquals(rows.get(0).get(2).textValue(), "null");
-    assertEquals(rows.get(0).get(3).textValue(), "null");
     assertEquals(rows.get(1).get(0).intValue(), 1);
     assertEquals(rows.get(1).get(1).intValue(), -2147483648);
-    assertEquals(rows.get(1).get(2).textValue(), "1");
-    assertEquals(rows.get(1).get(3).textValue(), "null");
+    assertEquals(rows.get(1).get(2).textValue(), "v1");
     for (int i = 2; i < getSelectionDefaultDocCount(); i++) {
       assertEquals(rows.get(i).get(0).intValue(), i);
       assertEquals(rows.get(i).get(1).intValue(), i);
-      assertEquals(rows.get(i).get(2).textValue(), String.valueOf(i));
-      assertEquals(rows.get(i).get(3).textValue(), String.valueOf(i));
+      assertEquals(rows.get(i).get(2).textValue(), "v" + i);
+      assertEquals(rows.get(i).get(3).textValue(), "v" + i);
     }
 
     // Aggregation only
@@ -200,19 +235,19 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(rows.get(0).get(0).textValue(), "null");
     assertEquals(rows.get(0).get(1).intValue(), Integer.MIN_VALUE);
     for (int i = 1; i < getSelectionDefaultDocCount(); i++) {
-      assertEquals(rows.get(i).get(0).textValue(), String.valueOf(i));
+      assertEquals(rows.get(i).get(0).textValue(), "v" + i);
       assertEquals(rows.get(i).get(1).intValue(), i);
     }
 
     // Filter
-    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1']  = '25'";
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] = 'v25'";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), 1);
-    assertEquals(rows.get(0).get(0).textValue(), "25");
+    assertEquals(rows.get(0).get(0).textValue(), "v25");
 
-    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['k1']  = 25";
+    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['k1'] = 25";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
@@ -220,12 +255,12 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     assertEquals(rows.get(0).get(0).intValue(), 25);
 
     // Filter on non-existing key
-    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['kk']  = '25'";
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['kk'] = 'v25'";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
     assertEquals(rows.size(), 0);
-    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['kk']  = 25";
+    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['kk'] = 25";
     pinotResponse = postQuery(query);
     assertEquals(pinotResponse.get("exceptions").size(), 0);
     rows = pinotResponse.get("resultTable").get("rows");
@@ -242,9 +277,172 @@ public class MapFieldTypeTest extends 
CustomDataQueryClusterIntegrationTest {
     }
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNotEqPredicate(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    // Test NOT_EQ predicate with string map
+    String query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] != 'v25'";
+    JsonNode pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    JsonNode rows = pinotResponse.get("resultTable").get("rows");
+    // All records except the one with k1 = 'v25' should be returned
+    // Verify that none of the returned rows have k1 = 'v25'
+    for (int i = 0; i < rows.size(); i++) {
+      assertNotEquals(rows.get(i).get(0).textValue(), "v25");
+    }
+
+    // Test NOT_EQ predicate with int map
+    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['k1'] != 25";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+    // All records except the one with k1 = 25 should be returned
+    // Verify that none of the returned rows have k1 = 25
+    for (int i = 0; i < rows.size(); i++) {
+      assertNotEquals(rows.get(i).get(0).textValue(), "v25");
+    }
+
+    // Test NOT_EQ predicate with non-existing key
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['kk'] != 'v25'";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+    // All records should be returned since the key doesn't exist
+    // assertEquals(rows.size(), 0);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testInPredicate(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    // Test IN predicate with string map
+    String query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] IN ('v25', 'v26')";
+    JsonNode pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    JsonNode rows = pinotResponse.get("resultTable").get("rows");
+    // Only records with k1 = 'v25' or 'v26' should be returned
+    assertEquals(rows.size(), 2);
+
+    // Verify the returned values
+    for (int i = 0; i < rows.size(); i++) {
+      String value = rows.get(i).get(0).textValue();
+      assert (value.equals("v25") || value.equals("v26"));
+    }
+
+    // Test IN predicate with int map
+    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['k1'] IN (25, 26)";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+    // Only records with k1 = 25 or 26 should be returned
+    assertEquals(rows.size(), 2);
+
+    // Verify the returned values
+    for (int i = 0; i < rows.size(); i++) {
+      int value = rows.get(i).get(0).intValue();
+      assert (value == 25 || value == 26);
+    }
+
+    // Test IN predicate with non-existing key
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['kk'] IN ('v25', 'v26')";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+    // No records should be returned since the key doesn't exist
+    assertEquals(rows.size(), 0);
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testNotInPredicate(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    // Test NOT IN predicate with string map
+    String query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] NOT IN ('v25', 'v26')";
+    JsonNode pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    JsonNode rows = pinotResponse.get("resultTable").get("rows");
+
+    // Verify the returned values
+    for (int i = 0; i < rows.size(); i++) {
+      String value = rows.get(i).get(0).textValue();
+      assert (!value.equals("v25") && !value.equals("v26"));
+    }
+
+    // Test NOT IN predicate with int map
+    query = "SELECT intMap['k2'] FROM " + getTableName() + " WHERE 
intMap['k1'] NOT IN (25, 26)";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+
+    // Verify the returned values
+    for (int i = 0; i < rows.size(); i++) {
+      int value = rows.get(i).get(0).intValue();
+      assert (value != 25 && value != 26);
+    }
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testIsNullPredicate(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    // Test IS_NULL predicate with string map
+    String query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] IS NULL";
+    JsonNode pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    JsonNode rows = pinotResponse.get("resultTable").get("rows");
+    assertEquals(rows.size(), 0);
+
+    // Test IS_NULL predicate with non-existing key
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['kk'] IS NULL";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+    assertEquals(rows.size(), 0);
+
+    // Test IS_NOT_NULL predicate with string map
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] IS NOT NULL";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+    rows = pinotResponse.get("resultTable").get("rows");
+    // All records should be returned since all records have k1 defined
+    if (useMultiStageQueryEngine) {
+      assertEquals(rows.size(), getSelectionDefaultDocCount());
+    } else {
+      //First Two rows are null for k1
+      assertEquals(rows.size(), getSelectionDefaultDocCount());
+    }
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testStringWithQuotes(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    // Test string with single quote in map value
+    String query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] = 'v25''s value'";
+    JsonNode pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+
+    // Test string with multiple single quotes
+    query = "SELECT stringMap['k2'] FROM " + getTableName() + " WHERE 
stringMap['k1'] = 'v25''s ''quoted'' value'";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+
+    // Test IN predicate with quoted strings
+    query = "SELECT stringMap['k2'] FROM " + getTableName()
+        + " WHERE stringMap['k1'] IN ('v25''s value', 'v26''s value')";
+    pinotResponse = postQuery(query);
+    assertEquals(pinotResponse.get("exceptions").size(), 0);
+  }
+
   @Override
   protected void setUseMultiStageQueryEngine(boolean useMultiStageQueryEngine) 
{
     super.setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    _setSelectionDefaultDocCount = useMultiStageQueryEngine ? 1000 : 10;
+    _setSelectionDefaultDocCount = useMultiStageQueryEngine ? NUM_DOCS : 
V1_DEFAULT_SELECTION_COUNT;
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
index a8c521ddbc..297d5e80f9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
@@ -42,7 +42,7 @@ public class RealtimeSegmentSegmentCreationDataSource 
implements SegmentCreation
 
   @Override
   public SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig 
statsCollectorConfig) {
-    return new RealtimeSegmentStatsContainer(_mutableSegment, 
_recordReader.getSortedDocIds());
+    return new RealtimeSegmentStatsContainer(_mutableSegment, 
_recordReader.getSortedDocIds(), statsCollectorConfig);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java
index e9b76af499..ef594e7a36 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/RealtimeSegmentStatsContainer.java
@@ -21,10 +21,15 @@ package 
org.apache.pinot.segment.local.realtime.converter.stats;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsContainer;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 
 
 /**
@@ -34,15 +39,26 @@ public class RealtimeSegmentStatsContainer implements 
SegmentPreIndexStatsContai
   private final MutableSegment _mutableSegment;
   private final Map<String, ColumnStatistics> _columnStatisticsMap = new 
HashMap<>();
 
-  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, 
@Nullable int[] sortedDocIds) {
+  public RealtimeSegmentStatsContainer(MutableSegment mutableSegment, 
@Nullable int[] sortedDocIds,
+      StatsCollectorConfig statsCollectorConfig) {
     _mutableSegment = mutableSegment;
 
     // Create all column statistics
     for (String columnName : mutableSegment.getPhysicalColumnNames()) {
       DataSource dataSource = mutableSegment.getDataSource(columnName);
-      if (dataSource.getDictionary() != null) {
-        _columnStatisticsMap
-            .put(columnName, new 
MutableColumnStatistics(mutableSegment.getDataSource(columnName), 
sortedDocIds));
+      if (dataSource instanceof MutableMapDataSource) {
+        ForwardIndexReader reader = dataSource.getForwardIndex();
+        MapColumnPreIndexStatsCollector mapColumnPreIndexStatsCollector =
+            new MapColumnPreIndexStatsCollector(dataSource.getColumnName(), 
statsCollectorConfig);
+        int numDocs = dataSource.getDataSourceMetadata().getNumDocs();
+        ForwardIndexReaderContext readerContext = reader.createContext();
+        for (int row = 0; row < numDocs; row++) {
+          mapColumnPreIndexStatsCollector.collect(reader.getMap(row, 
readerContext));
+        }
+        mapColumnPreIndexStatsCollector.seal();
+        _columnStatisticsMap.put(columnName, mapColumnPreIndexStatsCollector);
+      } else if (dataSource.getDictionary() != null) {
+        _columnStatisticsMap.put(columnName, new 
MutableColumnStatistics(dataSource, sortedDocIds));
       } else {
         _columnStatisticsMap.put(columnName, new 
MutableNoDictionaryColStatistics(dataSource));
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 92529b8263..78aea601bd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -134,6 +134,15 @@ public class MutableJsonIndexImpl implements 
MutableJsonIndex {
     } catch (Exception e) {
       throw new BadQueryRequestException("Invalid json match filter: " + 
filterString);
     }
+    return getMatchingDocIds(filter);
+  }
+
+  @Override
+  public MutableRoaringBitmap getMatchingDocIds(Object filterObj) {
+    if (!(filterObj instanceof FilterContext)) {
+      throw new BadQueryRequestException("Invalid json match filter: " + 
filterObj);
+    }
+    FilterContext filter = (FilterContext) filterObj;
 
     _readLock.lock();
     try {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
index 5198031a65..62e7a346df 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -49,20 +50,27 @@ import 
org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
   private final Object2ObjectOpenHashMap<String, 
AbstractColumnStatisticsCollector> _keyStats =
       new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
+  private final Map<String, Integer> _keyFrequencies = new 
Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
   private String[] _sortedValues;
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private boolean _sealed = false;
+  private ComplexFieldSpec _colFieldSpec;
 
   public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
     super(column, statsCollectorConfig);
     _sorted = false;
+    _colFieldSpec = (ComplexFieldSpec) 
statsCollectorConfig.getFieldSpecForColumn(column);
   }
 
   public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
     return _keyStats.get(key);
   }
 
+  public Map<String, Integer> getAllKeyFrequencies() {
+    return _keyFrequencies;
+  }
+
   @Override
   public void collect(Object entry) {
     assert !_sealed;
@@ -77,6 +85,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
       for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
         String key = mapValueEntry.getKey();
         Object value = mapValueEntry.getValue();
+        _keyFrequencies.merge(key, 1, Integer::sum);
         AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
         if (keyStats == null) {
           keyStats = createKeyStatsCollector(key, value);
@@ -140,6 +149,13 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   @Override
   public void seal() {
     if (!_sealed) {
+      //All the keys which have appeared less than total docs insert default 
null Value in unique values
+      FieldSpec valueFieldSpec = _colFieldSpec.getChildFieldSpec("value");
+      for (Map.Entry<String, Integer> entry : _keyFrequencies.entrySet()) {
+        if (entry.getValue() < _totalNumberOfEntries) {
+          
_keyStats.get(entry.getKey()).collect(valueFieldSpec.getDefaultNullValue());
+        }
+      }
       _sortedValues = _keyStats.keySet().toArray(new String[0]);
       Arrays.sort(_sortedValues);
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
index a98af4877e..584e3e7354 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
@@ -31,7 +31,6 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
@@ -126,10 +125,4 @@ public abstract class BaseDataSource implements DataSource 
{
   public VectorIndexReader getVectorIndex() {
     return getIndex(StandardIndexes.vector());
   }
-
-  @Nullable
-  @Override
-  public MapIndexReader getMapIndex() {
-    return getIndex(StandardIndexes.map());
-  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
index 9441cbd40b..823c516e0d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java
@@ -93,9 +93,10 @@ public class JsonIndexType extends 
AbstractIndexType<JsonIndexConfig, JsonIndexR
   @Override
   public JsonIndexCreator createIndexCreator(IndexCreationContext context, 
JsonIndexConfig indexConfig)
       throws IOException {
+    FieldSpec.DataType storedType = 
context.getFieldSpec().getDataType().getStoredType();
     Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
         "Json index is currently only supported on single-value columns");
-    
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() 
== FieldSpec.DataType.STRING,
+    Preconditions.checkState(storedType == FieldSpec.DataType.STRING || 
storedType == FieldSpec.DataType.MAP,
         "Json index is currently only supported on STRING columns");
     return context.isOnHeap() ? new 
OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
         indexConfig)
@@ -147,7 +148,8 @@ public class JsonIndexType extends 
AbstractIndexType<JsonIndexConfig, JsonIndexR
         throw new IndexReaderConstraintException(metadata.getColumnName(), 
StandardIndexes.json(),
             "Json index is currently only supported on single-value columns");
       }
-      if (metadata.getFieldSpec().getDataType().getStoredType() != 
FieldSpec.DataType.STRING) {
+      FieldSpec.DataType storedType = 
metadata.getFieldSpec().getDataType().getStoredType();
+      if (storedType != FieldSpec.DataType.STRING && storedType != 
FieldSpec.DataType.MAP) {
         throw new IndexReaderConstraintException(metadata.getColumnName(), 
StandardIndexes.json(),
             "Json index is currently only supported on STRING columns");
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 844b886ea3..46e3165be1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -1206,7 +1206,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
       forwardIndexConfig = fieldIndexConfig.getConfig(new 
ForwardIndexPlugin().getIndexType());
     }
     if (forwardIndexConfig == null) {
-      forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, 
null, null);
+      forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, 
null, null, null);
     }
 
     return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, 
forwardIndexConfig);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/BaseMapIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/BaseMapIndexCreator.java
deleted file mode 100644
index e2e702730e..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/BaseMapIndexCreator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.map;
-
-import java.io.File;
-import org.apache.pinot.segment.spi.index.creator.MapIndexCreator;
-import org.apache.pinot.spi.config.table.MapIndexConfig;
-
-
-public abstract class BaseMapIndexCreator implements MapIndexCreator {
-  public BaseMapIndexCreator(File indexDir, String name, MapIndexConfig 
indexConfig) {
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/ImmutableMapDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/ImmutableMapDataSource.java
index dfa1a00733..0ffe5e0a2d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/ImmutableMapDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/ImmutableMapDataSource.java
@@ -41,15 +41,12 @@ public class ImmutableMapDataSource extends 
BaseMapDataSource {
 
   public ImmutableMapDataSource(ColumnMetadata columnMetadata, 
ColumnIndexContainer columnIndexContainer) {
     super(new ImmutableMapDataSourceMetadata(columnMetadata), 
columnIndexContainer);
-    MapIndexReader mapIndexReader = getMapIndex();
-    if (mapIndexReader == null) {
-      // Fallback to use forward index
-      ForwardIndexReader<?> forwardIndex = getForwardIndex();
-      if (forwardIndex instanceof MapIndexReader) {
-        mapIndexReader = (MapIndexReader) forwardIndex;
-      } else {
-        mapIndexReader = new MapIndexReaderWrapper(forwardIndex, 
getFieldSpec());
-      }
+    MapIndexReader mapIndexReader;
+    ForwardIndexReader<?> forwardIndex = getForwardIndex();
+    if (forwardIndex instanceof MapIndexReader) {
+      mapIndexReader = (MapIndexReader) forwardIndex;
+    } else {
+      mapIndexReader = new MapIndexReaderWrapper(forwardIndex, getFieldSpec());
     }
     _mapIndexReader = mapIndexReader;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexHandler.java
deleted file mode 100644
index 5d1fbf6bbf..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.map;
-
-import java.util.Map;
-import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.TableConfig;
-
-
-public class MapIndexHandler implements IndexHandler {
-  public MapIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> configsByCol,
-      TableConfig tableConfig) {
-  }
-
-  @Override
-  public void updateIndices(SegmentDirectory.Writer segmentWriter)
-      throws Exception {
-  }
-
-  @Override
-  public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
-      throws Exception {
-    return false;
-  }
-
-  @Override
-  public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
-      throws Exception {
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexPlugin.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexPlugin.java
deleted file mode 100644
index 1d5a2224e1..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexPlugin.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.segment.local.segment.index.map;
-
-import com.google.auto.service.AutoService;
-import org.apache.pinot.segment.spi.index.IndexPlugin;
-
-
-@AutoService(IndexPlugin.class)
-public class MapIndexPlugin implements IndexPlugin<MapIndexType> {
-  public static final MapIndexType INSTANCE = new MapIndexType();
-
-  @Override
-  public MapIndexType getIndexType() {
-    return INSTANCE;
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
deleted file mode 100644
index 00d83f9750..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MapIndexType.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.segment.local.segment.index.map;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.segment.index.readers.map.ImmutableMapIndexReader;
-import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.creator.IndexCreationContext;
-import org.apache.pinot.segment.spi.index.AbstractIndexType;
-import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
-import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
-import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
-import org.apache.pinot.segment.spi.index.StandardIndexes;
-import org.apache.pinot.segment.spi.index.creator.MapIndexCreator;
-import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
-import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.MapIndexConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-
-
-public class MapIndexType extends AbstractIndexType<MapIndexConfig, 
MapIndexReader, MapIndexCreator> {
-  public static final String INDEX_DISPLAY_NAME = "map";
-  private static final List<String> EXTENSIONS =
-      Collections.singletonList(V1Constants.Indexes.MAP_INDEX_FILE_EXTENSION);
-  private static final String MAP_INDEX_CREATOR_CLASS_NAME = 
"mapIndexCreatorClassName";
-  private static final String MAP_INDEX_READER_CLASS_NAME = 
"mapIndexReaderClassName";
-  private static final String MUTABLE_MAP_INDEX_CLASS_NAME = 
"mutableMapIndexClassName";
-
-  protected MapIndexType() {
-    super(StandardIndexes.MAP_ID);
-  }
-
-  @Override
-  public Class<MapIndexConfig> getIndexConfigClass() {
-    return MapIndexConfig.class;
-  }
-
-  @Override
-  public MapIndexConfig getDefaultConfig() {
-    return MapIndexConfig.DISABLED;
-  }
-
-  @Override
-  public String getPrettyName() {
-    return INDEX_DISPLAY_NAME;
-  }
-
-  @Override
-  public ColumnConfigDeserializer<MapIndexConfig> createDeserializer() {
-    ColumnConfigDeserializer<MapIndexConfig> fromIndexes =
-        IndexConfigDeserializer.fromIndexes(getPrettyName(), 
getIndexConfigClass());
-    ColumnConfigDeserializer<MapIndexConfig> fromMapIndexConfigs =
-        IndexConfigDeserializer.fromMap(tableConfig -> 
tableConfig.getIndexingConfig().getMapIndexConfigs());
-    ColumnConfigDeserializer<MapIndexConfig> fromMapIndexColumns =
-        IndexConfigDeserializer.fromCollection(tableConfig -> 
tableConfig.getIndexingConfig().getMapIndexColumns(),
-            (accum, column) -> accum.put(column, MapIndexConfig.DEFAULT));
-    return 
fromIndexes.withExclusiveAlternative(fromMapIndexConfigs.withFallbackAlternative(fromMapIndexColumns));
-  }
-
-  @Override
-  public MapIndexCreator createIndexCreator(IndexCreationContext context, 
MapIndexConfig indexConfig)
-      throws IOException, ClassNotFoundException, NoSuchMethodException, 
InvocationTargetException,
-             InstantiationException, IllegalAccessException {
-    if (indexConfig.isDisabled()) {
-      return null;
-    }
-    if (indexConfig.getConfigs().containsKey(MAP_INDEX_CREATOR_CLASS_NAME)) {
-      String className = 
indexConfig.getConfigs().get(MAP_INDEX_CREATOR_CLASS_NAME).toString();
-      Preconditions.checkNotNull(className, "MapIndexCreator class name must 
be provided");
-      return (BaseMapIndexCreator) Class.forName(className)
-          .getConstructor(File.class, String.class, 
IndexCreationContext.class, MapIndexConfig.class)
-          .newInstance(context.getIndexDir(), 
context.getFieldSpec().getName(), context, indexConfig);
-    }
-    throw new IllegalArgumentException("MapIndexCreator class name must be 
provided");
-  }
-
-  @Override
-  protected IndexReaderFactory<MapIndexReader> createReaderFactory() {
-    return ReaderFactory.INSTANCE;
-  }
-
-  @Override
-  public List<String> getFileExtensions(@Nullable ColumnMetadata 
columnMetadata) {
-    return EXTENSIONS;
-  }
-
-  @Override
-  public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    return new MapIndexHandler(segmentDirectory, configsByCol, tableConfig);
-  }
-
-  private static class ReaderFactory extends 
IndexReaderFactory.Default<MapIndexConfig, MapIndexReader> {
-    public static final ReaderFactory INSTANCE = new ReaderFactory();
-
-    private ReaderFactory() {
-    }
-
-    @Override
-    protected IndexType<MapIndexConfig, MapIndexReader, ?> getIndexType() {
-      return StandardIndexes.map();
-    }
-
-    @Override
-    protected MapIndexReader createIndexReader(PinotDataBuffer dataBuffer, 
ColumnMetadata metadata,
-        MapIndexConfig indexConfig) {
-      if (indexConfig.isDisabled()) {
-        return null;
-      }
-      if (indexConfig.getConfigs().containsKey(MAP_INDEX_READER_CLASS_NAME)) {
-        String className = 
indexConfig.getConfigs().get(MAP_INDEX_READER_CLASS_NAME).toString();
-        Preconditions.checkNotNull(className, "MapIndexReader class name must 
be provided");
-        try {
-          return (MapIndexReader) 
Class.forName(className).getConstructor(PinotDataBuffer.class, 
ColumnMetadata.class)
-              .newInstance(dataBuffer, metadata);
-        } catch (Exception e) {
-          throw new RuntimeException("Failed to create MapIndexReader", e);
-        }
-      }
-      return new ImmutableMapIndexReader(dataBuffer, metadata);
-    }
-  }
-
-  @Override
-  protected void handleIndexSpecificCleanup(TableConfig tableConfig) {
-    tableConfig.getIndexingConfig().setMapIndexColumns(null);
-    tableConfig.getIndexingConfig().setMapIndexConfigs(null);
-  }
-
-  @Nullable
-  @Override
-  public MutableIndex createMutableIndex(MutableIndexContext context, 
MapIndexConfig config) {
-    if (config.isDisabled()) {
-      return null;
-    }
-    if (!context.getFieldSpec().isSingleValueField()) {
-      return null;
-    }
-
-    if (config.getConfigs().containsKey(MUTABLE_MAP_INDEX_CLASS_NAME)) {
-      String className = 
config.getConfigs().get(MUTABLE_MAP_INDEX_CLASS_NAME).toString();
-      Preconditions.checkNotNull(className, "MutableMapIndex class name must 
be provided");
-      try {
-        return (MutableIndex) 
Class.forName(className).getConstructor(MutableIndexContext.class, 
MapIndexConfig.class)
-            .newInstance(context, config);
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to create MutableMapIndex", e);
-      }
-    }
-
-    return new MutableMapIndexImpl(context, config);
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapDataSource.java
index 4e34c7907c..b0459ff3ae 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapDataSource.java
@@ -55,15 +55,13 @@ public class MutableMapDataSource extends BaseMapDataSource 
{
             partitionFunction, partitions, minValue, maxValue, 
maxRowLengthInBytes),
         new 
ColumnIndexContainer.FromMap.Builder().withAll(mutableIndexes).build());
     _mutableIndexes = mutableIndexes;
-    MapIndexReader mapIndexReader = getMapIndex();
-    if (mapIndexReader == null) {
-      // Fallback to use forward index
-      ForwardIndexReader<?> forwardIndex = getForwardIndex();
-      if (forwardIndex instanceof MapIndexReader) {
-        mapIndexReader = (MapIndexReader) forwardIndex;
-      } else {
-        mapIndexReader = new MapIndexReaderWrapper(forwardIndex, 
getFieldSpec());
-      }
+    MapIndexReader mapIndexReader;
+    // Fallback to use forward index
+    ForwardIndexReader<?> forwardIndex = getForwardIndex();
+    if (forwardIndex instanceof MapIndexReader) {
+      mapIndexReader = (MapIndexReader) forwardIndex;
+    } else {
+      mapIndexReader = new MapIndexReaderWrapper(forwardIndex, getFieldSpec());
     }
     _mapIndexReader = mapIndexReader;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapIndexImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapIndexImpl.java
deleted file mode 100644
index 7e1b160f60..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapIndexImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.map;
-
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.pinot.segment.spi.index.mutable.MutableIndex;
-import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
-import org.apache.pinot.spi.config.table.MapIndexConfig;
-
-
-public class MutableMapIndexImpl implements MutableIndex {
-  public MutableMapIndexImpl(MutableIndexContext context, MapIndexConfig 
config) {
-  }
-
-  @Override
-  public void add(@Nonnull Object value, int dictId, int docId) {
-  }
-
-  @Override
-  public void add(@Nonnull Object[] values, @Nullable int[] dictIds, int 
docId) {
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/NullDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/NullDataSource.java
index aab38a44d9..5dc5cf19f9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/NullDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/NullDataSource.java
@@ -35,7 +35,6 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
@@ -139,12 +138,6 @@ public class NullDataSource implements DataSource {
     return getIndex(StandardIndexes.vector());
   }
 
-  @Nullable
-  @Override
-  public MapIndexReader getMapIndex() {
-    return getIndex(StandardIndexes.map());
-  }
-
   public static class NullDataSourceMetadata implements DataSourceMetadata {
     String _name;
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
index c702c3dac7..596c426f6f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java
@@ -128,7 +128,15 @@ public class ImmutableJsonIndexReader implements 
JsonIndexReader {
     } catch (Exception e) {
       throw new BadQueryRequestException("Invalid json match filter: " + 
filterString);
     }
+    return getMatchingDocIds(filter);
+  }
 
+  @Override
+  public MutableRoaringBitmap getMatchingDocIds(Object filterObj) {
+    if (!(filterObj instanceof FilterContext)) {
+      throw new BadQueryRequestException("Invalid json match filter: " + 
filterObj);
+    }
+    FilterContext filter = (FilterContext) filterObj;
     if (filter.getType() == FilterContext.Type.PREDICATE && 
isExclusive(filter.getPredicate().getType())) {
       // Handle exclusive predicate separately because the flip can only be 
applied to the unflattened doc ids in order
       // to get the correct result, and it cannot be nested
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/map/ImmutableMapIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/map/ImmutableMapIndexReader.java
deleted file mode 100644
index 4db4a6a55e..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/map/ImmutableMapIndexReader.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.readers.map;
-
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
-import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
-import org.apache.pinot.segment.spi.index.IndexReader;
-import org.apache.pinot.segment.spi.index.IndexType;
-import org.apache.pinot.segment.spi.index.StandardIndexes;
-import org.apache.pinot.segment.spi.index.creator.MapIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.data.ComplexFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.utils.MapUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Reader for map index.
- * The v1 implementation is just bytes.
- *
- */
-public class ImmutableMapIndexReader implements 
MapIndexReader<ForwardIndexReaderContext, IndexReader> {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ImmutableMapIndexReader.class);
-  // NOTE: Use long type for _numDocs to comply with the RoaringBitmap APIs.
-  protected final PinotDataBuffer _dataBuffer;
-  private final ForwardIndexReader _forwardIndexReader;
-  private final FieldSpec _valueFieldSpec;
-  private final ColumnMetadata _columnMetadata;
-
-  public ImmutableMapIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata 
columnMetadata) {
-    int version = dataBuffer.getInt(0);
-    Preconditions.checkState(version == MapIndexCreator.VERSION_1,
-        "Unsupported map index version: %s.  Valid versions are {}", version, 
MapIndexCreator.VERSION_1);
-    _dataBuffer = dataBuffer;
-    _columnMetadata = columnMetadata;
-    _forwardIndexReader =
-        new VarByteChunkForwardIndexReaderV4(_dataBuffer, 
FieldSpec.DataType.BYTES, true);
-    ComplexFieldSpec complexFieldSpec = (ComplexFieldSpec) 
columnMetadata.getFieldSpec();
-    Preconditions.checkState(
-        
complexFieldSpec.getChildFieldSpec(ComplexFieldSpec.KEY_FIELD).getDataType() == 
FieldSpec.DataType.STRING,
-        "Only String key is supported in Map");
-    _valueFieldSpec = 
complexFieldSpec.getChildFieldSpec(ComplexFieldSpec.VALUE_FIELD);
-  }
-
-  @Override
-  public void close() {
-    // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by 
the caller and might be reused later. The
-    // caller is responsible of closing the PinotDataBuffer.
-  }
-
-  @Override
-  public IndexReader getKeyReader(String key, IndexType type) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public Set<String> getKeys() {
-    return Set.of();
-  }
-
-  @Override
-  public Map<IndexType, IndexReader> getKeyIndexes(String key) {
-    IndexReader fwdIdx = getKeyReader(key, StandardIndexes.forward());
-    if (fwdIdx != null) {
-      return Map.of(StandardIndexes.forward(), getKeyReader(key, 
StandardIndexes.forward()));
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public FieldSpec getKeyFieldSpec(String key) {
-    return _valueFieldSpec;
-  }
-
-  @Override
-  public FieldSpec.DataType getKeyStoredType(String key) {
-    return _valueFieldSpec.getDataType();
-  }
-
-  @Override
-  public ColumnMetadata getKeyMetadata(String key) {
-    return new ColumnMetadata() {
-      @Override
-      public FieldSpec getFieldSpec() {
-        return _valueFieldSpec;
-      }
-
-      @Override
-      public int getTotalDocs() {
-        return _columnMetadata.getTotalDocs();
-      }
-
-      @Override
-      public int getCardinality() {
-        return 0;
-      }
-
-      @Override
-      public boolean isSorted() {
-        return false;
-      }
-
-      @Override
-      public Comparable getMinValue() {
-        return null;
-      }
-
-      @Override
-      public Comparable getMaxValue() {
-        return null;
-      }
-
-      @Override
-      public boolean hasDictionary() {
-        return false;
-      }
-
-      @Override
-      public int getColumnMaxLength() {
-        return 0;
-      }
-
-      @Override
-      public int getBitsPerElement() {
-        return 0;
-      }
-
-      @Override
-      public int getMaxNumberOfMultiValues() {
-        return 0;
-      }
-
-      @Override
-      public int getTotalNumberOfEntries() {
-        return 0;
-      }
-
-      @Nullable
-      @Override
-      public PartitionFunction getPartitionFunction() {
-        return null;
-      }
-
-      @Nullable
-      @Override
-      public Set<Integer> getPartitions() {
-        return null;
-      }
-
-      @Override
-      public Map<IndexType<?, ?, ?>, Long> getIndexSizeMap() {
-        return Map.of();
-      }
-
-      @Override
-      public boolean isAutoGenerated() {
-        return false;
-      }
-    };
-  }
-
-  @Override
-  public boolean isDictionaryEncoded() {
-    return false;
-  }
-
-  @Override
-  public boolean isSingleValue() {
-    return true;
-  }
-
-  @Override
-  public FieldSpec.DataType getStoredType() {
-    return FieldSpec.DataType.MAP;
-  }
-
-  @Nullable
-  @Override
-  public ChunkCompressionType getCompressionType() {
-    return ChunkCompressionType.PASS_THROUGH;
-  }
-
-  @Override
-  public Map<String, Object> getMap(int docId, ForwardIndexReaderContext 
mapContext) {
-    return _forwardIndexReader.getMap(docId, mapContext);
-  }
-
-  @Override
-  public String getString(int docId, ForwardIndexReaderContext context) {
-    return MapUtils.toString(getMap(docId, context));
-  }
-}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 59315c8bec..35b319dd60 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -42,11 +42,9 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.raw.fwd";
     public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.fwd";
-    public static final String MAP_FORWARD_INDEX_FILE_EXTENSION = ".map.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = 
".bitmap.inv";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = 
".bitmap.range";
     public static final String JSON_INDEX_FILE_EXTENSION = ".json.idx";
-    public static final String MAP_INDEX_FILE_EXTENSION = ".map.idx";
     public static final String NATIVE_TEXT_INDEX_FILE_EXTENSION = 
".nativetext.idx";
     public static final String H3_INDEX_FILE_EXTENSION = ".h3.idx";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
index fd8883744c..b2ded6a0d8 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
@@ -28,7 +28,6 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
@@ -124,10 +123,4 @@ public interface DataSource {
    */
   @Nullable
   VectorIndexReader getVectorIndex();
-
-  /**
-   * Returns the map index for the column if exists, or {@code null} if not.
-   */
-  @Nullable
-  MapIndexReader getMapIndex();
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index b2a794ac2a..a83c94e580 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
@@ -82,7 +83,7 @@ public class ForwardIndexConfig extends IndexConfig {
   }
 
   public static ForwardIndexConfig getDisabled() {
-    return new ForwardIndexConfig(true, null, null, null, null, null, null, 
null);
+    return new ForwardIndexConfig(true, null, null, null, null, null, null, 
null, null);
   }
 
   @Nullable
@@ -97,10 +98,13 @@ public class ForwardIndexConfig extends IndexConfig {
   private final ChunkCompressionType _chunkCompressionType;
   @Nullable
   private final DictIdCompressionType _dictIdCompressionType;
+  @Nullable
+  private final Map<String, Object> _configs;
 
   public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable 
CompressionCodec compressionCodec,
       @Nullable Boolean deriveNumDocsPerChunk, @Nullable Integer 
rawIndexWriterVersion,
-      @Nullable String targetMaxChunkSize, @Nullable Integer 
targetDocsPerChunk) {
+      @Nullable String targetMaxChunkSize, @Nullable Integer 
targetDocsPerChunk,
+      @Nullable Map<String, Object> configs) {
     super(disabled);
     _compressionCodec = compressionCodec;
     _deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);
@@ -110,7 +114,7 @@ public class ForwardIndexConfig extends IndexConfig {
     _targetMaxChunkSizeBytes =
         targetMaxChunkSize == null ? _defaultTargetMaxChunkSizeBytes : (int) 
DataSizeUtils.toBytes(targetMaxChunkSize);
     _targetDocsPerChunk = targetDocsPerChunk == null ? 
_defaultTargetDocsPerChunk : targetDocsPerChunk;
-
+    _configs = configs != null ? configs : new HashMap<>();
     if (compressionCodec != null) {
       switch (compressionCodec) {
         case PASS_THROUGH:
@@ -158,9 +162,10 @@ public class ForwardIndexConfig extends IndexConfig {
       @JsonProperty("deriveNumDocsPerChunk") @Nullable Boolean 
deriveNumDocsPerChunk,
       @JsonProperty("rawIndexWriterVersion") @Nullable Integer 
rawIndexWriterVersion,
       @JsonProperty("targetMaxChunkSize") @Nullable String targetMaxChunkSize,
-      @JsonProperty("targetDocsPerChunk") @Nullable Integer 
targetDocsPerChunk) {
+      @JsonProperty("targetDocsPerChunk") @Nullable Integer targetDocsPerChunk,
+      @JsonProperty("configs") @Nullable Map<String, Object> configs) {
     this(disabled, getActualCompressionCodec(compressionCodec, 
chunkCompressionType, dictIdCompressionType),
-        deriveNumDocsPerChunk, rawIndexWriterVersion, targetMaxChunkSize, 
targetDocsPerChunk);
+        deriveNumDocsPerChunk, rawIndexWriterVersion, targetMaxChunkSize, 
targetDocsPerChunk, configs);
   }
 
   public static CompressionCodec getActualCompressionCodec(@Nullable 
CompressionCodec compressionCodec,
@@ -234,6 +239,12 @@ public class ForwardIndexConfig extends IndexConfig {
     return _dictIdCompressionType;
   }
 
+  @JsonIgnore
+  @Nullable
+  public Map<String, Object> getConfigs() {
+    return _configs;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -264,6 +275,7 @@ public class ForwardIndexConfig extends IndexConfig {
     private int _rawIndexWriterVersion = _defaultRawIndexWriterVersion;
     private String _targetMaxChunkSize = _defaultTargetMaxChunkSize;
     private int _targetDocsPerChunk = _defaultTargetDocsPerChunk;
+    private Map<String, Object> _configs = new HashMap<>();
 
     public Builder() {
     }
@@ -274,6 +286,7 @@ public class ForwardIndexConfig extends IndexConfig {
       _rawIndexWriterVersion = other._rawIndexWriterVersion;
       _targetMaxChunkSize = other._targetMaxChunkSize;
       _targetDocsPerChunk = other._targetDocsPerChunk;
+      _configs = other._configs;
     }
 
     public Builder withCompressionCodec(CompressionCodec compressionCodec) {
@@ -361,7 +374,7 @@ public class ForwardIndexConfig extends IndexConfig {
 
     public ForwardIndexConfig build() {
       return new ForwardIndexConfig(false, _compressionCodec, 
_deriveNumDocsPerChunk, _rawIndexWriterVersion,
-          _targetMaxChunkSize, _targetDocsPerChunk);
+          _targetMaxChunkSize, _targetDocsPerChunk, _configs);
     }
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/StandardIndexes.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/StandardIndexes.java
index 6145b75086..f97ee5252f 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/StandardIndexes.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/StandardIndexes.java
@@ -27,7 +27,6 @@ import 
org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
-import org.apache.pinot.segment.spi.index.creator.MapIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.VectorIndexCreator;
@@ -37,7 +36,6 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
@@ -45,7 +43,6 @@ import 
org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
-import org.apache.pinot.spi.config.table.MapIndexConfig;
 
 
 /**
@@ -81,7 +78,6 @@ public class StandardIndexes {
   public static final String TEXT_ID = "text_index";
   public static final String H3_ID = "h3_index";
   public static final String VECTOR_ID = "vector_index";
-  public static final String MAP_ID = "map_index";
 
   private StandardIndexes() {
   }
@@ -140,9 +136,4 @@ public class StandardIndexes {
     return (IndexType<VectorIndexConfig, VectorIndexReader, 
VectorIndexCreator>)
         IndexService.getInstance().get(VECTOR_ID);
   }
-
-  public static IndexType<MapIndexConfig, MapIndexReader, MapIndexCreator> 
map() {
-    return (IndexType<MapIndexConfig, MapIndexReader, MapIndexCreator>)
-        IndexService.getInstance().get(MAP_ID);
-  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
index a886111756..577460e5b1 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java
@@ -19,9 +19,11 @@
 package org.apache.pinot.segment.spi.index.creator;
 
 import java.io.IOException;
+import java.util.Map;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.spi.utils.JsonUtils;
 
 
 /**
@@ -34,7 +36,11 @@ public interface JsonIndexCreator extends IndexCreator {
   @Override
   default void add(@Nonnull Object value, int dictId)
       throws IOException {
-    add((String) value);
+    if (value instanceof Map) {
+      add(JsonUtils.objectToString(value));
+    } else {
+      add((String) value);
+    }
   }
 
   @Override
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/MapIndexCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/MapIndexCreator.java
deleted file mode 100644
index 5cda04b64f..0000000000
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/MapIndexCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.index.creator;
-
-import java.util.Map;
-
-
-/**
- * Creates the durable representation of a map index. Metadata about the Map 
Column can be passed through via
- * the IndexCreationContext and the implementation of this Interface can use 
that to determine the on
- * disk representation of the Map.
- */
-public interface MapIndexCreator extends ForwardIndexCreator {
-  int VERSION_1 = 1;
-
-  /**
-   *
-   * @param value The nonnull value of the cell. In case the cell was actually 
null, a default value is received instead
-   * @param dict This is ignored as the MapIndexCreator will manage the 
construction of dictionaries itself.
-   */
-  @Override
-  default void add(Object value, int dict) {
-    Map<String, Object> mapValue = (Map<String, Object>) value;
-    add(mapValue);
-  }
-
-  void add(Map<String, Object> mapValue);
-}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
index 9911171484..11600f1dee 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableJsonIndex.java
@@ -20,16 +20,22 @@ package org.apache.pinot.segment.spi.index.mutable;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Map;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+import org.apache.pinot.spi.utils.JsonUtils;
 
 
 public interface MutableJsonIndex extends JsonIndexReader, MutableIndex {
   @Override
   default void add(@Nonnull Object value, int dictId, int docId) {
     try {
-      add((String) value);
+      if (value instanceof Map) {
+        add(JsonUtils.objectToString(value));
+      } else {
+        add((String) value);
+      }
     } catch (IOException ex) {
       throw new UncheckedIOException(ex);
     }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableMapIndex.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableMapIndex.java
deleted file mode 100644
index 27e8989769..0000000000
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableMapIndex.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.spi.index.mutable;
-
-import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.segment.spi.index.reader.MapIndexReader;
-
-
-/**
- * Implementations of this interface can be used to represent indexes that 
store dynamically typed map values.
- */
-public interface MutableMapIndex extends 
MapIndexReader<ForwardIndexReaderContext, MutableIndex>, MutableForwardIndex {
-
-  @Override
-  default void add(@Nonnull Object value, int dictId, int docId) {
-    Map<String, Object> mapValue = (Map<String, Object>) value;
-    add(mapValue, docId);
-  }
-
-  @Override
-  default void add(@Nonnull Object[] values, @Nullable int[] dictIds, int 
docId) {
-    throw new UnsupportedOperationException("MultiValues are not yet supported 
for MAP columns");
-  }
-
-  /**
-   * Adds the given single value cell to the index.
-   *
-   * Unlike {@link org.apache.pinot.segment.spi.index.IndexCreator#add(Object, 
int)}, rows can be added in no
-   * particular order, so the docId is required by this method.
-   *
-   * @param value The nonnull value of the cell. In case the cell was actually 
null, a default value is received instead
-   * @param docId The document id of the given row. A non-negative value.
-   */
-  void add(Map<String, Object> value, int docId);
-
-  /**
-   * Get the Min Value that the given Key has within the segment that this 
Reader is bound to.
-   *
-   * @param key A Key within the given Map column.
-   * @return The minimum value that is bound to that key within the Segment 
that this Reader is bound to.
-   */
-  Comparable<?> getMinValueForKey(String key);
-
-  /**
-   * Get the Max Value that the given Key has within the segment that this 
Reader is bound to.
-   *
-   * @param key A Key within the given Map column.
-   * @return The maximum value that is bound to that key within the Segment 
that this Reader is bound to.
-   */
-  Comparable<?> getMaxValueForKey(String key);
-}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
index bf13b740f3..0f41e502b8 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java
@@ -30,10 +30,12 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  */
 public interface JsonIndexReader extends IndexReader {
 
+  MutableRoaringBitmap getMatchingDocIds(String filterString);
+
   /**
-   * Returns the matching document ids for the given filter.
+   * Returns the matching document ids for the given filter Context.
    */
-  MutableRoaringBitmap getMatchingDocIds(String filterString);
+  MutableRoaringBitmap getMatchingDocIds(Object filterCtx);
 
   /**
    * Returns the matching document ids for the given filter.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 16642e2a7f..b777bbc294 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -44,8 +44,6 @@ public class IndexingConfig extends BaseJsonConfig {
   @Deprecated
   private List<String> _jsonIndexColumns;
   private Map<String, JsonIndexConfig> _jsonIndexConfigs;
-  private List<String> _mapIndexColumns;
-  private Map<String, MapIndexConfig> _mapIndexConfigs;
   private List<String> _sortedColumn;
   private List<String> _bloomFilterColumns;
   private Map<String, BloomFilterConfig> _bloomFilterConfigs;
@@ -155,22 +153,6 @@ public class IndexingConfig extends BaseJsonConfig {
     _jsonIndexConfigs = jsonIndexConfigs;
   }
 
-  public List<String> getMapIndexColumns() {
-    return _mapIndexColumns;
-  }
-
-  public void setMapIndexColumns(List<String> mapIndexColumns) {
-    _mapIndexColumns = mapIndexColumns;
-  }
-
-  public void setMapIndexConfigs(Map<String, MapIndexConfig> mapIndexConfigs) {
-    _mapIndexConfigs = mapIndexConfigs;
-  }
-
-  public Map<String, MapIndexConfig> getMapIndexConfigs() {
-    return _mapIndexConfigs;
-  }
-
   public boolean isAutoGeneratedInvertedIndex() {
     return _autoGeneratedInvertedIndex;
   }
@@ -442,12 +424,6 @@ public class IndexingConfig extends BaseJsonConfig {
     if (_jsonIndexConfigs != null) {
       allColumns.addAll(_jsonIndexConfigs.keySet());
     }
-    if (_mapIndexColumns != null) {
-      allColumns.addAll(_mapIndexColumns);
-    }
-    if (_mapIndexConfigs != null) {
-      allColumns.addAll(_mapIndexConfigs.keySet());
-    }
     if (_bloomFilterColumns != null) {
       allColumns.addAll(_bloomFilterColumns);
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/MapIndexConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/MapIndexConfig.java
deleted file mode 100644
index 9acb3032a8..0000000000
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/MapIndexConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.config.table;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import javax.annotation.Nullable;
-
-
-/**
- * Configs related to the MAP index:
- */
-public class MapIndexConfig extends IndexConfig {
-  public static final MapIndexConfig DEFAULT = new MapIndexConfig();
-  public static final MapIndexConfig DISABLED = new MapIndexConfig(true);
-
-  private final Map<String, Object> _configs;
-
-  public MapIndexConfig() {
-    this(false);
-  }
-
-  public MapIndexConfig(Boolean disabled) {
-    super(disabled);
-    _configs = new HashMap<>();
-  }
-
-  @JsonCreator
-  public MapIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
-      @JsonProperty("configs") @Nullable Map<String, Object> configs) {
-    super(disabled);
-    _configs = configs != null ? configs : new HashMap<>();
-  }
-
-  public Map<String, Object> getConfigs() {
-    return _configs;
-  }
-
-  public void putConfig(String key, Object value) {
-    _configs.put(key, value);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    MapIndexConfig config = (MapIndexConfig) o;
-    return _configs.equals(config._configs);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(super.hashCode(), _configs);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to