Jackie-Jiang commented on code in PR #15526:
URL: https://github.com/apache/pinot/pull/15526#discussion_r2069618976


##########
pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java:
##########
@@ -238,6 +248,8 @@ private BaseFilterOperator 
constructPhysicalOperator(FilterContext filter, int n
             return new H3IndexFilterOperator(_indexSegment, _queryContext, 
predicate, numDocs);
           } else if (canApplyH3IndexForInclusionCheck(predicate, 
lhs.getFunction())) {
             return new H3InclusionIndexFilterOperator(_indexSegment, 
_queryContext, predicate, numDocs);
+          } else if (canApplyMapFilter(predicate, lhs.getIdentifier())) {

Review Comment:
   The second argument is incorrect



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapIndexFilterOperator.java:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.CaseFormat;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
+import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.common.request.context.predicate.NotInPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+
+
+/**
+ * Filter operator for Map matching that internally uses 
JsonMatchFilterOperator or ExpressionFilterOperator.
+ * This operator converts map predicates to JSON predicates and delegates 
filtering operations
+ * to JsonMatchFilterOperator.
+ */
+public class MapIndexFilterOperator extends BaseFilterOperator {
+  private static final String EXPLAIN_NAME = "FILTER_MAP_INDEX";
+
+  private final JsonMatchFilterOperator _jsonMatchOperator;
+  private final ExpressionFilterOperator _expressionFilterOperator;
+  private final String _columnName;
+  private final String _keyName;
+  private final Predicate _predicate;
+  private final JsonMatchPredicate _jsonMatchPredicate;
+
+  public MapIndexFilterOperator(IndexSegment indexSegment, Predicate 
predicate, QueryContext queryContext,
+      int numDocs) {
+    super(numDocs, false);
+    _predicate = predicate;
+
+    // Get column name and key name from function arguments
+    List<ExpressionContext> arguments = 
predicate.getLhs().getFunction().getArguments();
+    if (arguments.size() != 2) {
+      throw new IllegalStateException("Expected two arguments (column name and 
key name), found: " + arguments.size());
+    }
+
+    _columnName = arguments.get(0).getIdentifier();
+    _keyName = cleanKey(String.valueOf(arguments.get(1).getLiteral()));

Review Comment:
   ```suggestion
       _keyName = arguments.get(1).getLiteral().getStringValue();
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/map/MutableMapDataSource.java:
##########
@@ -55,15 +55,13 @@ public MutableMapDataSource(FieldSpec fieldSpec, int 
numDocs, int numValues, int
             partitionFunction, partitions, minValue, maxValue, 
maxRowLengthInBytes),
         new 
ColumnIndexContainer.FromMap.Builder().withAll(mutableIndexes).build());
     _mutableIndexes = mutableIndexes;
-    MapIndexReader mapIndexReader = getMapIndex();
-    if (mapIndexReader == null) {
-      // Fallback to use forward index
-      ForwardIndexReader<?> forwardIndex = getForwardIndex();
-      if (forwardIndex instanceof MapIndexReader) {
-        mapIndexReader = (MapIndexReader) forwardIndex;
-      } else {
-        mapIndexReader = new MapIndexReaderWrapper(forwardIndex, 
getFieldSpec());
-      }
+    MapIndexReader mapIndexReader;
+    // Fallback to use forward index
+    ForwardIndexReader<?> forwardIndex = getForwardIndex();
+    if (forwardIndex instanceof MapIndexReader) {
+      mapIndexReader = (MapIndexReader) forwardIndex;
+    } else {
+      mapIndexReader = new MapIndexReaderWrapper(forwardIndex, getFieldSpec());

Review Comment:
   Not introduced in this PR, but what is the difference between them? Why do 
we need a wrapper?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapIndexFilterOperator.java:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.CaseFormat;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
+import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.common.request.context.predicate.NotInPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+
+
+/**
+ * Filter operator for Map matching that internally uses 
JsonMatchFilterOperator or ExpressionFilterOperator.
+ * This operator converts map predicates to JSON predicates and delegates 
filtering operations
+ * to JsonMatchFilterOperator.
+ */
+public class MapIndexFilterOperator extends BaseFilterOperator {
+  private static final String EXPLAIN_NAME = "FILTER_MAP_INDEX";
+
+  private final JsonMatchFilterOperator _jsonMatchOperator;
+  private final ExpressionFilterOperator _expressionFilterOperator;
+  private final String _columnName;
+  private final String _keyName;
+  private final Predicate _predicate;
+  private final JsonMatchPredicate _jsonMatchPredicate;

Review Comment:
   (minor) Do we need this as a member variable? This class serves as a 
delegate to either json index based operator or udf based operator



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/JsonIndexCreator.java:
##########
@@ -34,7 +36,11 @@ public interface JsonIndexCreator extends IndexCreator {
   @Override
   default void add(@Nonnull Object value, int dictId)
       throws IOException {
-    add((String) value);
+    if (value instanceof Map) {

Review Comment:
   Can we improve it to directly add a `JsonNode` to avoid the extra ser/de?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapIndexFilterOperator.java:
##########
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.CaseFormat;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.predicate.EqPredicate;
+import org.apache.pinot.common.request.context.predicate.InPredicate;
+import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
+import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
+import org.apache.pinot.common.request.context.predicate.NotInPredicate;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
+
+
+/**
+ * Filter operator for Map matching that internally uses 
JsonMatchFilterOperator or ExpressionFilterOperator.
+ * This operator converts map predicates to JSON predicates and delegates 
filtering operations
+ * to JsonMatchFilterOperator.
+ */
+public class MapIndexFilterOperator extends BaseFilterOperator {
+  private static final String EXPLAIN_NAME = "FILTER_MAP_INDEX";
+
+  private final JsonMatchFilterOperator _jsonMatchOperator;
+  private final ExpressionFilterOperator _expressionFilterOperator;
+  private final String _columnName;
+  private final String _keyName;
+  private final Predicate _predicate;
+  private final JsonMatchPredicate _jsonMatchPredicate;
+
+  public MapIndexFilterOperator(IndexSegment indexSegment, Predicate 
predicate, QueryContext queryContext,
+      int numDocs) {
+    super(numDocs, false);
+    _predicate = predicate;
+
+    // Get column name and key name from function arguments
+    List<ExpressionContext> arguments = 
predicate.getLhs().getFunction().getArguments();
+    if (arguments.size() != 2) {
+      throw new IllegalStateException("Expected two arguments (column name and 
key name), found: " + arguments.size());
+    }
+
+    _columnName = arguments.get(0).getIdentifier();
+    _keyName = cleanKey(String.valueOf(arguments.get(1).getLiteral()));
+
+    // Get JSON index and create operator
+    DataSource dataSource = indexSegment.getDataSource(_columnName);
+    JsonIndexReader jsonIndex = dataSource.getJsonIndex();
+    if (jsonIndex != null && useJsonIndex(_predicate.getType())) {
+      _jsonMatchPredicate = createJsonMatchPredicate();
+      _jsonMatchOperator = initializeJsonMatchFilterOperator(jsonIndex, 
numDocs);
+      _expressionFilterOperator = null;
+    } else {
+      _jsonMatchPredicate = null;
+      _jsonMatchOperator = null;
+      _expressionFilterOperator = new ExpressionFilterOperator(indexSegment, 
queryContext, predicate, numDocs);
+    }
+  }
+
+  /**
+   * Creates a JsonMatchPredicate based on the original predicate type
+   */
+  private JsonMatchPredicate createJsonMatchPredicate() {
+    // Convert predicate to JSON format based on type
+    String jsonValue;
+    switch (_predicate.getType()) {
+      case EQ:
+        jsonValue = createJsonEqPredicateValue(_keyName, ((EqPredicate) 
_predicate).getValue());
+        break;
+      case NOT_EQ:
+        jsonValue = createJsonNotEqPredicateValue(_keyName, ((NotEqPredicate) 
_predicate).getValue());
+        break;
+      case IN:
+        jsonValue = createJsonInPredicateValue(_keyName, ((InPredicate) 
_predicate).getValues());
+        break;
+      case NOT_IN:
+        jsonValue = createJsonNotInPredicateValue(_keyName, ((NotInPredicate) 
_predicate).getValues());
+        break;
+      default:
+        throw new IllegalStateException(
+            "Unsupported predicate type for creating json match predicate: " + 
_predicate.getType());
+    }
+
+    // Create identifier expression for the JSON column
+    ExpressionContext jsonLhs = ExpressionContext.forIdentifier("json");
+    return new JsonMatchPredicate(jsonLhs, jsonValue, null);
+  }
+
+  @Override
+  protected BlockDocIdSet getTrues() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.getTrues();
+    } else {
+      return _expressionFilterOperator.getTrues();
+    }
+  }
+
+  @Override
+  public boolean canOptimizeCount() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.canOptimizeCount();
+    } else {
+      return _expressionFilterOperator.canOptimizeCount();
+    }
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.getNumMatchingDocs();
+    } else {
+      return _expressionFilterOperator.getNumMatchingDocs();
+    }
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.canProduceBitmaps();
+    } else {
+      return _expressionFilterOperator.canProduceBitmaps();
+    }
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    if (_jsonMatchOperator != null) {
+      return _jsonMatchOperator.getBitmaps();
+    } else {
+      return _expressionFilterOperator.getBitmaps();
+    }
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public String toExplainString() {
+    StringBuilder stringBuilder =
+        new 
StringBuilder(EXPLAIN_NAME).append("(column:").append(_columnName).append(",key:").append(_keyName)
+            
.append(",indexLookUp:map_index").append(",operator:").append(_predicate.getType()).append(",predicate:")
+            .append(_predicate);
+
+    if (_jsonMatchOperator != null) {
+      stringBuilder.append(",delegateTo:json_match");
+    } else {
+      stringBuilder.append(",delegateTo:expression_filter");
+    }
+
+    return stringBuilder.append(')').toString();
+  }
+
+  @Override
+  protected String getExplainName() {
+    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, 
EXPLAIN_NAME);
+  }
+
+  @Override
+  protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) {
+    super.explainAttributes(attributeBuilder);
+    attributeBuilder.putString("column", _columnName);
+    attributeBuilder.putString("key", _keyName);
+    attributeBuilder.putString("indexLookUp", "map_index");
+    attributeBuilder.putString("operator", _predicate.getType().name());
+    attributeBuilder.putString("predicate", _predicate.toString());
+
+    if (_jsonMatchOperator != null) {
+      attributeBuilder.putString("delegateTo", "json_match");
+    } else {
+      attributeBuilder.putString("delegateTo", "expression_filter");
+    }
+  }
+
+  /**
+   * Initializes the JsonMatchFilterOperator with the given JsonIndexReader
+   */
+  private JsonMatchFilterOperator 
initializeJsonMatchFilterOperator(JsonIndexReader jsonIndex, int numDocs) {
+    return new JsonMatchFilterOperator(jsonIndex, _jsonMatchPredicate, 
numDocs);
+  }
+
+  private String createJsonEqPredicateValue(String key, String value) {

Review Comment:
   Does it work? Do you need to quote the value if it is string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to