This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 103b13e39a ExpressionFilterOperator NULL support. (#11220)
103b13e39a is described below

commit 103b13e39a5b0c24113fdaed9bdd6b260f76c76e
Author: Shen Yu <s...@startree.ai>
AuthorDate: Tue Aug 1 21:39:32 2023 -0700

    ExpressionFilterOperator NULL support. (#11220)
---
 .../ExpressionScanDocIdIterator.java               | 157 +++++++++++++++++----
 .../operator/docidsets/ExpressionDocIdSet.java     |   6 +-
 .../operator/filter/ExpressionFilterOperator.java  |  14 +-
 .../queries/NullHandlingEnabledQueriesTest.java    |  90 ++++++++++++
 4 files changed, 237 insertions(+), 30 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
index 6d9e770a70..455812f4d6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
@@ -53,8 +53,9 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
   private final PredicateEvaluator _predicateEvaluator;
   private final Map<String, DataSource> _dataSourceMap;
   private final int _endDocId;
-
   private final int[] _docIdBuffer = new 
int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+  private final boolean _nullHandlingEnabled;
+  private final PredicateEvaluationResult _predicateEvaluationResult;
 
   private int _blockEndDocId = 0;
   private PeekableIntIterator _docIdIterator;
@@ -64,11 +65,14 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
   private long _numEntriesScanned = 0L;
 
   public ExpressionScanDocIdIterator(TransformFunction transformFunction, 
PredicateEvaluator predicateEvaluator,
-      Map<String, DataSource> dataSourceMap, int numDocs) {
+      Map<String, DataSource> dataSourceMap, int numDocs, boolean 
nullHandlingEnabled,
+      PredicateEvaluationResult predicateEvaluationResult) {
     _transformFunction = transformFunction;
     _predicateEvaluator = predicateEvaluator;
     _dataSourceMap = dataSourceMap;
     _endDocId = numDocs;
+    _nullHandlingEnabled = nullHandlingEnabled;
+    _predicateEvaluationResult = predicateEvaluationResult;
   }
 
   @Override
@@ -144,68 +148,164 @@ public final class ExpressionScanDocIdIterator 
implements ScanBasedDocIdIterator
     TransformResultMetadata resultMetadata = 
_transformFunction.getResultMetadata();
     if (resultMetadata.isSingleValue()) {
       _numEntriesScanned += numDocs;
+      boolean predicateEvaluationResult = _predicateEvaluationResult == 
PredicateEvaluationResult.TRUE;
+      RoaringBitmap nullBitmap = null;
       if (resultMetadata.hasDictionary()) {
         int[] dictIds = 
_transformFunction.transformToDictIdsSV(projectionBlock);
-        for (int i = 0; i < numDocs; i++) {
-          if (_predicateEvaluator.applySV(dictIds[i])) {
-            matchingDocIds.add(_docIdBuffer[i]);
+        if (_nullHandlingEnabled) {
+          nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+        }
+        if (nullBitmap != null && !nullBitmap.isEmpty()) {
+          for (int i = 0; i < numDocs; i++) {
+            if (_predicateEvaluator.applySV(dictIds[i]) == 
predicateEvaluationResult && !nullBitmap.contains(i)) {
+              matchingDocIds.add(_docIdBuffer[i]);
+            }
+          }
+        } else {
+          for (int i = 0; i < numDocs; i++) {
+            if (_predicateEvaluator.applySV(dictIds[i]) == 
predicateEvaluationResult) {
+              matchingDocIds.add(_docIdBuffer[i]);
+            }
           }
         }
       } else {
         switch (resultMetadata.getDataType().getStoredType()) {
           case INT:
             int[] intValues = 
_transformFunction.transformToIntValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(intValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(intValues[i]) == 
predicateEvaluationResult && !nullBitmap.contains(i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(intValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
           case LONG:
             long[] longValues = 
_transformFunction.transformToLongValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(longValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(longValues[i]) == 
predicateEvaluationResult && !nullBitmap.contains(
+                    i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(longValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
           case FLOAT:
             float[] floatValues = 
_transformFunction.transformToFloatValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(floatValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(floatValues[i]) == 
predicateEvaluationResult && !nullBitmap.contains(
+                    i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(floatValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
           case DOUBLE:
             double[] doubleValues = 
_transformFunction.transformToDoubleValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(doubleValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(doubleValues[i]) == 
predicateEvaluationResult && !nullBitmap.contains(
+                    i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(doubleValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
           case STRING:
             String[] stringValues = 
_transformFunction.transformToStringValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(stringValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(stringValues[i]) == 
predicateEvaluationResult && !nullBitmap.contains(
+                    i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(stringValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
           case BYTES:
             byte[][] bytesValues = 
_transformFunction.transformToBytesValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(bytesValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(bytesValues[i]) == 
predicateEvaluationResult && !nullBitmap.contains(
+                    i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(bytesValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
           case BIG_DECIMAL:
             BigDecimal[] bigDecimalValues = 
_transformFunction.transformToBigDecimalValuesSV(projectionBlock);
-            for (int i = 0; i < numDocs; i++) {
-              if (_predicateEvaluator.applySV(bigDecimalValues[i])) {
-                matchingDocIds.add(_docIdBuffer[i]);
+            if (_nullHandlingEnabled) {
+              nullBitmap = _transformFunction.getNullBitmap(projectionBlock);
+            }
+            if (nullBitmap != null && !nullBitmap.isEmpty()) {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(bigDecimalValues[i]) == 
predicateEvaluationResult
+                    && !nullBitmap.contains(i)) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
+              }
+            } else {
+              for (int i = 0; i < numDocs; i++) {
+                if (_predicateEvaluator.applySV(bigDecimalValues[i]) == 
predicateEvaluationResult) {
+                  matchingDocIds.add(_docIdBuffer[i]);
+                }
               }
             }
             break;
@@ -214,6 +314,7 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
         }
       }
     } else {
+      // TODO(https://github.com/apache/pinot/issues/10882): support NULL for 
multi-value.
       if (resultMetadata.hasDictionary()) {
         int[][] dictIdsArray = 
_transformFunction.transformToDictIdsMV(projectionBlock);
         for (int i = 0; i < numDocs; i++) {
@@ -326,4 +427,8 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
       return Collections.emptyList();
     }
   }
+
+  public enum PredicateEvaluationResult {
+    TRUE, FALSE
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
index 70322ac525..00c174d4a6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java
@@ -30,8 +30,10 @@ public final class ExpressionDocIdSet implements 
BlockDocIdSet {
   private final ExpressionScanDocIdIterator _docIdIterator;
 
   public ExpressionDocIdSet(TransformFunction transformFunction, 
PredicateEvaluator predicateEvaluator,
-      Map<String, DataSource> dataSourceMap, int numDocs) {
-    _docIdIterator = new ExpressionScanDocIdIterator(transformFunction, 
predicateEvaluator, dataSourceMap, numDocs);
+      Map<String, DataSource> dataSourceMap, int numDocs, boolean 
nullHandlingEnabled,
+      ExpressionScanDocIdIterator.PredicateEvaluationResult 
predicateEvaluationResult) {
+    _docIdIterator = new ExpressionScanDocIdIterator(transformFunction, 
predicateEvaluator, dataSourceMap, numDocs,
+        nullHandlingEnabled, predicateEvaluationResult);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
index 6aeee82db6..4338dd8dee 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.ColumnContext;
+import 
org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator;
 import org.apache.pinot.core.operator.docidsets.ExpressionDocIdSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
@@ -43,12 +44,14 @@ import org.apache.pinot.segment.spi.datasource.DataSource;
 public class ExpressionFilterOperator extends BaseFilterOperator {
   private static final String EXPLAIN_NAME = "FILTER_EXPRESSION";
 
+  private final QueryContext _queryContext;
   private final Map<String, DataSource> _dataSourceMap;
   private final TransformFunction _transformFunction;
   private final PredicateEvaluator _predicateEvaluator;
 
   public ExpressionFilterOperator(IndexSegment segment, QueryContext 
queryContext, Predicate predicate, int numDocs) {
     super(numDocs, queryContext.isNullHandlingEnabled());
+    _queryContext = queryContext;
 
     Set<String> columns = new HashSet<>();
     ExpressionContext lhs = predicate.getLhs();
@@ -61,7 +64,7 @@ public class ExpressionFilterOperator extends 
BaseFilterOperator {
       _dataSourceMap.put(column, dataSource);
       columnContextMap.put(column, ColumnContext.fromDataSource(dataSource));
     });
-    _transformFunction = TransformFunctionFactory.get(lhs, columnContextMap, 
queryContext);
+    _transformFunction = TransformFunctionFactory.get(lhs, columnContextMap, 
_queryContext);
     _predicateEvaluator =
         PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
_transformFunction.getDictionary(),
             _transformFunction.getResultMetadata().getDataType());
@@ -69,7 +72,14 @@ public class ExpressionFilterOperator extends 
BaseFilterOperator {
 
   @Override
   protected BlockDocIdSet getTrues() {
-    return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, 
_dataSourceMap, _numDocs);
+    return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, 
_dataSourceMap, _numDocs,
+        _queryContext.isNullHandlingEnabled(), 
ExpressionScanDocIdIterator.PredicateEvaluationResult.TRUE);
+  }
+
+  @Override
+  protected BlockDocIdSet getFalses() {
+    return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, 
_dataSourceMap, _numDocs,
+        _queryContext.isNullHandlingEnabled(), 
ExpressionScanDocIdIterator.PredicateEvaluationResult.FALSE);
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
index 0338d3d756..60ef65f181 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java
@@ -628,4 +628,94 @@ public class NullHandlingEnabledQueriesTest extends 
BaseQueriesTest {
     assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
     assertArrayEquals(rows.get(0), new Object[]{true});
   }
+
+  @Test
+  public void testAdditionExpressionFilterOperator()
+      throws Exception {
+    initializeRows();
+    insertRow(null);
+    insertRow(Integer.MIN_VALUE);
+    insertRow(1);
+    insertRow(-1);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, 
FieldSpec.DataType.INT).build();
+    setUpSegments(tableConfig, schema);
+    String query = String.format("SELECT %s FROM testTable WHERE add(%s, 0) < 
0", COLUMN1, COLUMN1);
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query, 
QUERY_OPTIONS);
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES * 2);
+  }
+
+  @Test
+  public void testAdditionExpressionFilterOperatorInsideNotFilterOperator()
+      throws Exception {
+    initializeRows();
+    insertRow(null);
+    insertRow(Integer.MIN_VALUE);
+    insertRow(1);
+    insertRow(-1);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, 
FieldSpec.DataType.INT).build();
+    setUpSegments(tableConfig, schema);
+    String query = String.format("SELECT %s FROM testTable WHERE NOT(add(%s, 
0) > 0)", COLUMN1, COLUMN1);
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query, 
QUERY_OPTIONS);
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES * 2);
+  }
+
+  @Test
+  public void testGreatestExpressionFilterOperator()
+      throws Exception {
+    initializeRows();
+    insertRowWithTwoColumns(null, null);
+    insertRowWithTwoColumns(Integer.MIN_VALUE, Integer.MIN_VALUE);
+    insertRowWithTwoColumns(null, 1);
+    insertRowWithTwoColumns(1, null);
+    insertRowWithTwoColumns(-1, -1);
+    insertRowWithTwoColumns(-1, null);
+    insertRowWithTwoColumns(null, -1);
+    insertRowWithTwoColumns(1, 1);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+        .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+    setUpSegments(tableConfig, schema);
+    String query =
+        String.format("SELECT %s, %s FROM testTable WHERE GREATEST(%s, %s) < 0 
LIMIT 100", COLUMN1, COLUMN2, COLUMN1,
+            COLUMN2);
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query, 
QUERY_OPTIONS);
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES * 4);
+  }
+
+  @Test
+  public void testExpressionFilterOperatorApplyAndForGetFalses()
+      throws Exception {
+    initializeRows();
+    insertRowWithTwoColumns(null, null);
+    insertRowWithTwoColumns(Integer.MIN_VALUE, null);
+    insertRowWithTwoColumns(1, null);
+    insertRowWithTwoColumns(-1, 1);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT)
+        .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build();
+    setUpSegments(tableConfig, schema);
+    String query =
+        String.format("SELECT %s FROM testTable WHERE NOT(add(%s, 0) > 0) AND 
%s IS NULL", COLUMN1, COLUMN1, COLUMN2);
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query, 
QUERY_OPTIONS);
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES);
+    assertArrayEquals(rows.get(0), new Object[]{Integer.MIN_VALUE});
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to