This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 103b13e39a ExpressionFilterOperator NULL support. (#11220) 103b13e39a is described below commit 103b13e39a5b0c24113fdaed9bdd6b260f76c76e Author: Shen Yu <s...@startree.ai> AuthorDate: Tue Aug 1 21:39:32 2023 -0700 ExpressionFilterOperator NULL support. (#11220) --- .../ExpressionScanDocIdIterator.java | 157 +++++++++++++++++---- .../operator/docidsets/ExpressionDocIdSet.java | 6 +- .../operator/filter/ExpressionFilterOperator.java | 14 +- .../queries/NullHandlingEnabledQueriesTest.java | 90 ++++++++++++ 4 files changed, 237 insertions(+), 30 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java index 6d9e770a70..455812f4d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java @@ -53,8 +53,9 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator private final PredicateEvaluator _predicateEvaluator; private final Map<String, DataSource> _dataSourceMap; private final int _endDocId; - private final int[] _docIdBuffer = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]; + private final boolean _nullHandlingEnabled; + private final PredicateEvaluationResult _predicateEvaluationResult; private int _blockEndDocId = 0; private PeekableIntIterator _docIdIterator; @@ -64,11 +65,14 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator private long _numEntriesScanned = 0L; public ExpressionScanDocIdIterator(TransformFunction transformFunction, PredicateEvaluator predicateEvaluator, - Map<String, DataSource> dataSourceMap, int numDocs) { + Map<String, DataSource> dataSourceMap, int numDocs, boolean nullHandlingEnabled, + PredicateEvaluationResult predicateEvaluationResult) { _transformFunction = transformFunction; _predicateEvaluator = predicateEvaluator; _dataSourceMap = dataSourceMap; _endDocId = numDocs; + _nullHandlingEnabled = nullHandlingEnabled; + _predicateEvaluationResult = predicateEvaluationResult; } @Override @@ -144,68 +148,164 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator TransformResultMetadata resultMetadata = _transformFunction.getResultMetadata(); if (resultMetadata.isSingleValue()) { _numEntriesScanned += numDocs; + boolean predicateEvaluationResult = _predicateEvaluationResult == PredicateEvaluationResult.TRUE; + RoaringBitmap nullBitmap = null; if (resultMetadata.hasDictionary()) { int[] dictIds = _transformFunction.transformToDictIdsSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(dictIds[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(dictIds[i]) == predicateEvaluationResult && !nullBitmap.contains(i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(dictIds[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } } else { switch (resultMetadata.getDataType().getStoredType()) { case INT: int[] intValues = _transformFunction.transformToIntValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(intValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(intValues[i]) == predicateEvaluationResult && !nullBitmap.contains(i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(intValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; case LONG: long[] longValues = _transformFunction.transformToLongValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(longValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(longValues[i]) == predicateEvaluationResult && !nullBitmap.contains( + i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(longValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; case FLOAT: float[] floatValues = _transformFunction.transformToFloatValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(floatValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(floatValues[i]) == predicateEvaluationResult && !nullBitmap.contains( + i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(floatValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; case DOUBLE: double[] doubleValues = _transformFunction.transformToDoubleValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(doubleValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(doubleValues[i]) == predicateEvaluationResult && !nullBitmap.contains( + i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(doubleValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; case STRING: String[] stringValues = _transformFunction.transformToStringValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(stringValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(stringValues[i]) == predicateEvaluationResult && !nullBitmap.contains( + i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(stringValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; case BYTES: byte[][] bytesValues = _transformFunction.transformToBytesValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(bytesValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(bytesValues[i]) == predicateEvaluationResult && !nullBitmap.contains( + i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(bytesValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; case BIG_DECIMAL: BigDecimal[] bigDecimalValues = _transformFunction.transformToBigDecimalValuesSV(projectionBlock); - for (int i = 0; i < numDocs; i++) { - if (_predicateEvaluator.applySV(bigDecimalValues[i])) { - matchingDocIds.add(_docIdBuffer[i]); + if (_nullHandlingEnabled) { + nullBitmap = _transformFunction.getNullBitmap(projectionBlock); + } + if (nullBitmap != null && !nullBitmap.isEmpty()) { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(bigDecimalValues[i]) == predicateEvaluationResult + && !nullBitmap.contains(i)) { + matchingDocIds.add(_docIdBuffer[i]); + } + } + } else { + for (int i = 0; i < numDocs; i++) { + if (_predicateEvaluator.applySV(bigDecimalValues[i]) == predicateEvaluationResult) { + matchingDocIds.add(_docIdBuffer[i]); + } } } break; @@ -214,6 +314,7 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator } } } else { + // TODO(https://github.com/apache/pinot/issues/10882): support NULL for multi-value. if (resultMetadata.hasDictionary()) { int[][] dictIdsArray = _transformFunction.transformToDictIdsMV(projectionBlock); for (int i = 0; i < numDocs; i++) { @@ -326,4 +427,8 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator return Collections.emptyList(); } } + + public enum PredicateEvaluationResult { + TRUE, FALSE + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java index 70322ac525..00c174d4a6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/ExpressionDocIdSet.java @@ -30,8 +30,10 @@ public final class ExpressionDocIdSet implements BlockDocIdSet { private final ExpressionScanDocIdIterator _docIdIterator; public ExpressionDocIdSet(TransformFunction transformFunction, PredicateEvaluator predicateEvaluator, - Map<String, DataSource> dataSourceMap, int numDocs) { - _docIdIterator = new ExpressionScanDocIdIterator(transformFunction, predicateEvaluator, dataSourceMap, numDocs); + Map<String, DataSource> dataSourceMap, int numDocs, boolean nullHandlingEnabled, + ExpressionScanDocIdIterator.PredicateEvaluationResult predicateEvaluationResult) { + _docIdIterator = new ExpressionScanDocIdIterator(transformFunction, predicateEvaluator, dataSourceMap, numDocs, + nullHandlingEnabled, predicateEvaluationResult); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java index 6aeee82db6..4338dd8dee 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java @@ -30,6 +30,7 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.common.BlockDocIdSet; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.ColumnContext; +import org.apache.pinot.core.operator.dociditerators.ExpressionScanDocIdIterator; import org.apache.pinot.core.operator.docidsets.ExpressionDocIdSet; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; @@ -43,12 +44,14 @@ import org.apache.pinot.segment.spi.datasource.DataSource; public class ExpressionFilterOperator extends BaseFilterOperator { private static final String EXPLAIN_NAME = "FILTER_EXPRESSION"; + private final QueryContext _queryContext; private final Map<String, DataSource> _dataSourceMap; private final TransformFunction _transformFunction; private final PredicateEvaluator _predicateEvaluator; public ExpressionFilterOperator(IndexSegment segment, QueryContext queryContext, Predicate predicate, int numDocs) { super(numDocs, queryContext.isNullHandlingEnabled()); + _queryContext = queryContext; Set<String> columns = new HashSet<>(); ExpressionContext lhs = predicate.getLhs(); @@ -61,7 +64,7 @@ public class ExpressionFilterOperator extends BaseFilterOperator { _dataSourceMap.put(column, dataSource); columnContextMap.put(column, ColumnContext.fromDataSource(dataSource)); }); - _transformFunction = TransformFunctionFactory.get(lhs, columnContextMap, queryContext); + _transformFunction = TransformFunctionFactory.get(lhs, columnContextMap, _queryContext); _predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, _transformFunction.getDictionary(), _transformFunction.getResultMetadata().getDataType()); @@ -69,7 +72,14 @@ public class ExpressionFilterOperator extends BaseFilterOperator { @Override protected BlockDocIdSet getTrues() { - return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs); + return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs, + _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.TRUE); + } + + @Override + protected BlockDocIdSet getFalses() { + return new ExpressionDocIdSet(_transformFunction, _predicateEvaluator, _dataSourceMap, _numDocs, + _queryContext.isNullHandlingEnabled(), ExpressionScanDocIdIterator.PredicateEvaluationResult.FALSE); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java index 0338d3d756..60ef65f181 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/NullHandlingEnabledQueriesTest.java @@ -628,4 +628,94 @@ public class NullHandlingEnabledQueriesTest extends BaseQueriesTest { assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); assertArrayEquals(rows.get(0), new Object[]{true}); } + + @Test + public void testAdditionExpressionFilterOperator() + throws Exception { + initializeRows(); + insertRow(null); + insertRow(Integer.MIN_VALUE); + insertRow(1); + insertRow(-1); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = String.format("SELECT %s FROM testTable WHERE add(%s, 0) < 0", COLUMN1, COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES * 2); + } + + @Test + public void testAdditionExpressionFilterOperatorInsideNotFilterOperator() + throws Exception { + initializeRows(); + insertRow(null); + insertRow(Integer.MIN_VALUE); + insertRow(1); + insertRow(-1); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = String.format("SELECT %s FROM testTable WHERE NOT(add(%s, 0) > 0)", COLUMN1, COLUMN1); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES * 2); + } + + @Test + public void testGreatestExpressionFilterOperator() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(null, null); + insertRowWithTwoColumns(Integer.MIN_VALUE, Integer.MIN_VALUE); + insertRowWithTwoColumns(null, 1); + insertRowWithTwoColumns(1, null); + insertRowWithTwoColumns(-1, -1); + insertRowWithTwoColumns(-1, null); + insertRowWithTwoColumns(null, -1); + insertRowWithTwoColumns(1, 1); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s, %s FROM testTable WHERE GREATEST(%s, %s) < 0 LIMIT 100", COLUMN1, COLUMN2, COLUMN1, + COLUMN2); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES * 4); + } + + @Test + public void testExpressionFilterOperatorApplyAndForGetFalses() + throws Exception { + initializeRows(); + insertRowWithTwoColumns(null, null); + insertRowWithTwoColumns(Integer.MIN_VALUE, null); + insertRowWithTwoColumns(1, null); + insertRowWithTwoColumns(-1, 1); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(COLUMN1, FieldSpec.DataType.INT) + .addSingleValueDimension(COLUMN2, FieldSpec.DataType.INT).build(); + setUpSegments(tableConfig, schema); + String query = + String.format("SELECT %s FROM testTable WHERE NOT(add(%s, 0) > 0) AND %s IS NULL", COLUMN1, COLUMN1, COLUMN2); + + BrokerResponseNative brokerResponse = getBrokerResponse(query, QUERY_OPTIONS); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), NUM_OF_SEGMENT_COPIES); + assertArrayEquals(rows.get(0), new Object[]{Integer.MIN_VALUE}); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org