This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 33fa27704f ExpressionFilterOperator IS NULL and IS NOT NULL support. (#11249) 33fa27704f is described below commit 33fa27704f04076998f6a34c5199207989100794 Author: Shen Yu <s...@startree.ai> AuthorDate: Thu Aug 3 22:23:19 2023 -0700 ExpressionFilterOperator IS NULL and IS NOT NULL support. (#11249) --- .../ExpressionScanDocIdIterator.java | 26 ++- .../operator/docidsets/ExpressionDocIdSet.java | 3 +- .../operator/filter/ExpressionFilterOperator.java | 39 ++++- .../org/apache/pinot/core/plan/FilterPlanNode.java | 3 +- .../queries/NullHandlingEnabledQueriesTest.java | 191 +++++++++++++++++++++ 5 files changed, 247 insertions(+), 15 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java index 0bec70df69..f97d0ccd57 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.OptionalInt; +import javax.annotation.Nullable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.BitmapDocIdSetOperator; @@ -64,9 +65,9 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator // the expression, but we only track the number of entries scanned for the resolved expression. private long _numEntriesScanned = 0L; - public ExpressionScanDocIdIterator(TransformFunction transformFunction, PredicateEvaluator predicateEvaluator, - Map<String, DataSource> dataSourceMap, int numDocs, boolean nullHandlingEnabled, - PredicateEvaluationResult predicateEvaluationResult) { + public ExpressionScanDocIdIterator(TransformFunction transformFunction, + @Nullable PredicateEvaluator predicateEvaluator, Map<String, DataSource> dataSourceMap, int numDocs, + boolean nullHandlingEnabled, PredicateEvaluationResult predicateEvaluationResult) { _transformFunction = transformFunction; _predicateEvaluator = predicateEvaluator; _dataSourceMap = dataSourceMap; @@ -146,10 +147,20 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator private void processProjectionBlock(ProjectionBlock projectionBlock, BitmapDataProvider matchingDocIds) { int numDocs = projectionBlock.getNumDocs(); TransformResultMetadata resultMetadata = _transformFunction.getResultMetadata(); - boolean predicateEvaluationResult = _predicateEvaluationResult == PredicateEvaluationResult.TRUE; if (resultMetadata.isSingleValue()) { _numEntriesScanned += numDocs; RoaringBitmap nullBitmap = null; + if (_predicateEvaluationResult == PredicateEvaluationResult.NULL) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + if (nullBitmap != null) { + for (int i : nullBitmap) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + return; + } + boolean predicateEvaluationResult = _predicateEvaluationResult == PredicateEvaluationResult.TRUE; + assert (_predicateEvaluator != null); if (resultMetadata.hasDictionary()) { int[] dictIds = _transformFunction.transformToDictIdsSV(projectionBlock); if (_nullHandlingEnabled) { @@ -315,6 +326,11 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator } } else { // TODO(https://github.com/apache/pinot/issues/10882): support NULL for multi-value. + if (_predicateEvaluationResult == PredicateEvaluationResult.NULL) { + return; + } + boolean predicateEvaluationResult = _predicateEvaluationResult == PredicateEvaluationResult.TRUE; + assert (_predicateEvaluator != null); if (resultMetadata.hasDictionary()) { int[][] dictIdsArray = _transformFunction.transformToDictIdsMV(projectionBlock); for (int i = 0; i < numDocs; i++) { @@ -429,6 +445,6 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator } public enum PredicateEvaluationResult { - TRUE, FALSE + TRUE, NULL, FALSE } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java index 00c174d4a6..d9fdcfbbcb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator.docidsets; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.core.common.BlockDocIdSet; import org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; @@ -29,7 +30,7 @@ import org.apache.pinot.segment.spi.datasource.DataSource; public final class ExpressionDocIdSet implements BlockDocIdSet { private final ExpressionScanDocIdIterator _docIdIterator; - public ExpressionDocIdSet(TransformFunction transformFunction, PredicateEvaluator predicateEvaluator, + public ExpressionDocIdSet(TransformFunction transformFunction, @Nullable PredicateEvaluator predicateEvaluator, Map<String, DataSource> dataSourceMap, int numDocs, boolean nullHandlingEnabled, ExpressionScanDocIdIterator.PredicateEvaluationResult predicateEvaluationResult) { _docIdIterator = new ExpressionScanDocIdIterator(transformFunction, predicateEvaluator, dataSourceMap, numDocs, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java index 4338dd8dee..7c0fe3b829 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java @@ -32,6 +32,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.ColumnContext; import org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator; import org.apache.pinot.core.operator.docidsets.ExpressionDocIdSet; +import org.apache.pinot.core.operator.docidsets.NotDocIdSet; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; import org.apache.pinot.core.operator.transform.function.TransformFunction; @@ -47,6 +48,7 @@ public class ExpressionFilterOperator extends BaseFilterOperator { private final QueryContext _queryContext; private final Map<String, DataSource> _dataSourceMap; private final TransformFunction _transformFunction; + private final Predicate.Type _predicateType; private final PredicateEvaluator _predicateEvaluator; public ExpressionFilterOperator(IndexSegment segment, QueryContext queryContext, Predicate predicate, int numDocs) { @@ -65,21 +67,44 @@ public class ExpressionFilterOperator extends BaseFilterOperator { columnContextMap.put(column, ColumnContext.fromDataSource(dataSource)); }); _transformFunction = TransformFunctionFactory.get(lhs, columnContextMap, _queryContext); - _predicateEvaluator = - PredicateEvaluatorProvider.getPredicateEvaluator(predicate, _transformFunction.getDictionary(), - _transformFunction.getResultMetadata().getDataType()); + _predicateType = predicate.getType(); + if (_predicateType == Predicate.Type.IS_NULL || _predicateType == Predicate.Type.IS_NOT_NULL) { + _predicateEvaluator = null; + } else { + _predicateEvaluator = + PredicateEvaluatorProvider.getPredicateEvaluator(predicate, _transformFunction.getDictionary(), + _transformFunction.getResultMetadata().getDataType()); + } } @Override protected BlockDocIdSet getTrues() { - return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs, - _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.TRUE); + if (_predicateType == Predicate.Type.IS_NULL) { + return getNulls(); + } else if (_predicateType == Predicate.Type.IS_NOT_NULL) { + return new NotDocIdSet(getNulls(), _numDocs); + } else { + return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs, + _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.TRUE); + } + } + + @Override + protected BlockDocIdSet getNulls() { + return new ExpressionDocIdSet(_transformFunction, null, _dataSourceMap, _numDocs, + _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.NULL); } @Override protected BlockDocIdSet getFalses() { - return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs, - _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.FALSE); + if (_predicateType == Predicate.Type.IS_NULL) { + return new NotDocIdSet(getNulls(), _numDocs); + } else if (_predicateType == Predicate.Type.IS_NOT_NULL) { + return getNulls(); + } else { + return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs, + _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.FALSE); + } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java index af30f18e8f..6eafcbf170 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java @@ -241,8 +241,7 @@ public class FilterPlanNode implements PlanNode { } else if (canApplyH3IndexForInclusionCheck(predicate, lhs.getFunction())) { return new H3InclusionIndexFilterOperator(_indexSegment, _queryContext, predicate, numDocs); } else { - // TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (IS_NULL, - // IS_NOT_NULL, TEXT_MATCH) + // TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (TEXT_MATCH) return new ExpressionFilterOperator(_indexSegment, _queryContext, predicate, numDocs); } } else { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java index 10bad7a232..ef9386a955 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java @@ -897,4 +897,195 @@ public class NullHandlingEnabledQueriesTest extends BaseQueriesTest { List<Object[]> rows = resultTable.getRows(); assertEquals(rows.size(), 0); } + + @Test + public void testExpressionFilterOperatoIsNullPredicate() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(null, 1); + insertRowWithTwoColumns(1, 2); + insertRowWithTwoColumns(-1, 3); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE ADD(%s, 0) IS NULL LIMIT 100", COLUMN1, COLUMN2, COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{null, 1}); + } + + @Test + public void testExpressionFilterOperatorIsNotNullPredicate() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(null, 1); + insertRowWithTwoColumns(null, 2); + insertRowWithTwoColumns(1, 3); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE ADD(%s, 0) IS NOT NULL LIMIT 100", COLUMN1, COLUMN2, COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{1, 3}); + } + + @Test + public void testExpressionFilterOperatorIsNullPredicateInsideNotFilterOperator() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(null, 1); + insertRowWithTwoColumns(null, 2); + insertRowWithTwoColumns(1, 3); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE NOT(ADD(%s, 0) IS NULL) LIMIT 100", COLUMN1, COLUMN2, + COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{1, 3}); + } + + @Test + public void testExpressionFilterOperatorIsNotNullPredicateInsideNotFilterOperator() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(null, 1); + insertRowWithTwoColumns(2, 3); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE NOT(ADD(%s, 0) IS NOT NULL) LIMIT 100", COLUMN1, COLUMN2, + COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{null, 1}); + } + + @Test + public void testExpressionFilterOperatorApplyIsNullPredicateToNotOfColumn() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(true, 1); + insertRowWithTwoColumns(null, 2); + insertRowWithTwoColumns(false, 3); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.BOOLEAN) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE (NOT %s) IS NULL LIMIT 100", COLUMN1, COLUMN2, COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{null, 2}); + } + + @Test + public void testExpressionFilterOperatorApplyAndForGetNulls() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(Integer.MIN_VALUE, null); + insertRowWithTwoColumns(1, null); + insertRowWithTwoColumns(-1, 1); + insertRowWithTwoColumns(null, null); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE (add(%s, 0) IS NULL) AND (%s IS NULL)", COLUMN1, COLUMN2, + COLUMN1, COLUMN2); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{null, null}); + } + + @Test + public void testExpressionFilterOperatorOnMultiValue() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(new Integer[]{1, 2, 3}, 1); + insertRowWithTwoColumns(new Integer[]{2, 3, 4}, null); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addMultiValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT * FROM testTable WHERE (VALUEIN(%s, 2, 3) IN (2, 3)) AND (%s = 1)", COLUMN1, COLUMN2); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{new Integer[]{1, 2, 3}, 1}); + } + + @Test + public void testExpressionFilterOperatorMultiValueIsNull() + throws Exception { + initializeRows(); + insertRow(new Integer[]{1, 2, 3}); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addMultiValueDimension(COLUMN1, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = String.format("SELECT * FROM testTable WHERE (VALUEIN(%s, 2, 3) IS NULL)", COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 0); + } + + @Test + public void testExpressionFilterOperatorMultiValueIsNotNull() + throws Exception { + initializeRows(); + insertRow(new Integer[]{1, 2, 3}); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addMultiValueDimension(COLUMN1, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = String.format("SELECT * FROM testTable WHERE (VALUEIN(%s, 2, 3) IS NOT NULL)", COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{new Integer[]{1, 2, 3}}); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org