This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6fd21f28cb Add IGNORE NULLS option to FIRST_VALUE and LAST_VALUE window functions (#14264) 6fd21f28cb is described below commit 6fd21f28cb6214668c5724d1ca4062b4a48ee19b Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Tue Oct 29 04:59:46 2024 +0530 Add IGNORE NULLS option to FIRST_VALUE and LAST_VALUE window functions (#14264) --- .../pinot/common/collections/DualValueList.java | 54 + pinot-common/src/main/proto/expressions.proto | 1 + .../tests/NullHandlingIntegrationTest.java | 25 + .../pinot/calcite/sql/fun/PinotOperatorTable.java | 41 +- .../planner/logical/PlanNodeToRelConverter.java | 2 +- .../pinot/query/planner/logical/RexExpression.java | 11 +- .../query/planner/logical/RexExpressionUtils.java | 4 +- .../serde/ProtoExpressionToRexExpression.java | 2 +- .../serde/RexExpressionToProtoExpression.java | 10 +- .../window/aggregate/AggregateWindowFunction.java | 2 +- .../window/value/FirstValueWindowFunction.java | 186 +++- .../window/value/LastValueWindowFunction.java | 175 +++- .../window/value/LeadValueWindowFunction.java | 4 +- .../operator/window/value/ValueWindowFunction.java | 10 + .../operator/WindowAggregateOperatorTest.java | 1046 +++++++++++++++----- .../test/resources/queries/WindowFunctions.json | 148 ++- 16 files changed, 1400 insertions(+), 321 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/collections/DualValueList.java b/pinot-common/src/main/java/org/apache/pinot/common/collections/DualValueList.java new file mode 100644 index 0000000000..6db8c63baa --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/collections/DualValueList.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.collections; + +import java.util.AbstractList; + + +/** + * An immutable list like the one returned by {@link java.util.Collections#nCopies(int, Object)}, but with two values + * (that are not interleaved) instead of a single one. Useful for avoiding unnecessary allocations. + */ +public class DualValueList<E> extends AbstractList<E> { + + private final E _firstValue; + private final E _secondValue; + private final int _firstCount; + private final int _totalSize; + + public DualValueList(E firstValue, int firstCount, E secondValue, int secondCount) { + _firstValue = firstValue; + _firstCount = firstCount; + _secondValue = secondValue; + _totalSize = firstCount + secondCount; + } + + @Override + public E get(int index) { + if (index < 0 || index >= _totalSize) { + throw new IndexOutOfBoundsException(index); + } + return index < _firstCount ? _firstValue : _secondValue; + } + + @Override + public int size() { + return _totalSize; + } +} diff --git a/pinot-common/src/main/proto/expressions.proto b/pinot-common/src/main/proto/expressions.proto index 83ce64f265..e402bd97c8 100644 --- a/pinot-common/src/main/proto/expressions.proto +++ b/pinot-common/src/main/proto/expressions.proto @@ -92,6 +92,7 @@ message FunctionCall { string functionName = 2; repeated Expression functionOperands = 3; bool isDistinct = 4; + bool ignoreNulls = 5; } message Expression { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java index cf3911a32e..ad673f2e50 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java @@ -326,6 +326,31 @@ public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { assertTrue(response.get("resultTable").get("rows").get(0).get(0).isNull()); } + @Test + public void testWindowFunctionIgnoreNulls() + throws Exception { + // Window functions are only supported in the multi-stage query engine + setUseMultiStageQueryEngine(true); + String sqlQuery = + "SELECT salary, LAST_VALUE(salary) IGNORE NULLS OVER (ORDER BY DaysSinceEpoch) AS gapfilledSalary from " + + "mytable"; + JsonNode response = postQuery(sqlQuery); + assertNoError(response); + + // Check if the LAST_VALUE window function with IGNORE NULLS has effectively gap-filled the salary values + Integer lastSalary = null; + JsonNode rows = response.get("resultTable").get("rows"); + for (int i = 0; i < rows.size(); i++) { + JsonNode row = rows.get(i); + if (!row.get(0).isNull()) { + assertEquals(row.get(0).asInt(), row.get(1).asInt()); + lastSalary = row.get(0).asInt(); + } else { + assertEquals(lastSalary, row.get(1).numberValue()); + } + } + } + @Override protected void overrideBrokerConf(PinotConfiguration brokerConf) { brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING, "true"); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java index 94bd38e8e7..d549ca4149 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java @@ -32,6 +32,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.sql.fun.SqlLeadLagAggFunction; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.ReturnTypes; @@ -134,10 +135,6 @@ public class PinotOperatorTable implements SqlOperatorTable { SqlStdOperatorTable.MODE, SqlStdOperatorTable.MIN, SqlStdOperatorTable.MAX, - SqlStdOperatorTable.LAST_VALUE, - SqlStdOperatorTable.FIRST_VALUE, - SqlStdOperatorTable.LEAD, - SqlStdOperatorTable.LAG, SqlStdOperatorTable.AVG, SqlStdOperatorTable.STDDEV_POP, SqlStdOperatorTable.COVAR_POP, @@ -152,7 +149,17 @@ public class PinotOperatorTable implements SqlOperatorTable { SqlStdOperatorTable.RANK, SqlStdOperatorTable.ROW_NUMBER, + // WINDOW Functions (non-aggregate) + SqlStdOperatorTable.LAST_VALUE, + SqlStdOperatorTable.FIRST_VALUE, + // TODO: Replace these with SqlStdOperatorTable.LEAD and SqlStdOperatorTable.LAG when the function implementations + // are updated to support the IGNORE NULLS option. + PinotLeadWindowFunction.INSTANCE, + PinotLagWindowFunction.INSTANCE, + // SPECIAL OPERATORS + SqlStdOperatorTable.IGNORE_NULLS, + SqlStdOperatorTable.RESPECT_NULLS, SqlStdOperatorTable.BETWEEN, SqlStdOperatorTable.SYMMETRIC_BETWEEN, SqlStdOperatorTable.NOT_BETWEEN, @@ -372,4 +379,30 @@ public class PinotOperatorTable implements SqlOperatorTable { public List<SqlOperator> getOperatorList() { return _operatorList; } + + private static class PinotLeadWindowFunction extends SqlLeadLagAggFunction { + static final SqlOperator INSTANCE = new PinotLeadWindowFunction(); + + public PinotLeadWindowFunction() { + super(SqlKind.LEAD); + } + + @Override + public boolean allowsNullTreatment() { + return false; + } + } + + private static class PinotLagWindowFunction extends SqlLeadLagAggFunction { + static final SqlOperator INSTANCE = new PinotLagWindowFunction(); + + public PinotLagWindowFunction() { + super(SqlKind.LAG); + } + + @Override + public boolean allowsNullTreatment() { + return false; + } + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java index 13bbd791b2..bc6ba26884 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java @@ -324,7 +324,7 @@ public final class PlanNodeToRelConverter { RelDataType relDataType = funCall.getDataType().toType(_builder.getTypeFactory()); Window.RexWinAggCall winCall = new Window.RexWinAggCall(aggFunction, relDataType, operands, aggCalls.size(), // same as the one used in LogicalWindow.create - funCall.isDistinct(), false); + funCall.isDistinct(), funCall.isIgnoreNulls()); aggCalls.add(winCall); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java index 3419fdc5bb..0df030f197 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java @@ -111,17 +111,20 @@ public interface RexExpression { private final List<RexExpression> _functionOperands; // whether the function is a distinct function. private final boolean _isDistinct; + // whether the function should ignore nulls (relevant to certain window functions like LAST_VALUE). + private final boolean _ignoreNulls; public FunctionCall(ColumnDataType dataType, String functionName, List<RexExpression> functionOperands) { - this(dataType, functionName, functionOperands, false); + this(dataType, functionName, functionOperands, false, false); } public FunctionCall(ColumnDataType dataType, String functionName, List<RexExpression> functionOperands, - boolean isDistinct) { + boolean isDistinct, boolean ignoreNulls) { _dataType = dataType; _functionName = functionName; _functionOperands = functionOperands; _isDistinct = isDistinct; + _ignoreNulls = ignoreNulls; } public ColumnDataType getDataType() { @@ -140,6 +143,10 @@ public interface RexExpression { return _isDistinct; } + public boolean isIgnoreNulls() { + return _ignoreNulls; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java index b23e12f224..d9b4af5fca 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java @@ -405,12 +405,12 @@ public class RexExpressionUtils { public static RexExpression.FunctionCall fromAggregateCall(AggregateCall aggregateCall) { return new RexExpression.FunctionCall(RelToPlanNodeConverter.convertToColumnDataType(aggregateCall.type), getFunctionName(aggregateCall.getAggregation()), fromRexNodes(aggregateCall.rexList), - aggregateCall.isDistinct()); + aggregateCall.isDistinct(), false); } public static RexExpression.FunctionCall fromWindowAggregateCall(Window.RexWinAggCall winAggCall) { return new RexExpression.FunctionCall(RelToPlanNodeConverter.convertToColumnDataType(winAggCall.type), - getFunctionName(winAggCall.op), fromRexNodes(winAggCall.operands), winAggCall.distinct); + getFunctionName(winAggCall.op), fromRexNodes(winAggCall.operands), winAggCall.distinct, winAggCall.ignoreNulls); } public static Integer getValueAsInt(@Nullable RexNode in) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java index 0a5e9dc68c..cca846958e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java @@ -58,7 +58,7 @@ public class ProtoExpressionToRexExpression { operands.add(convertExpression(protoOperand)); } return new RexExpression.FunctionCall(convertColumnDataType(functionCall.getDataType()), - functionCall.getFunctionName(), operands, functionCall.getIsDistinct()); + functionCall.getFunctionName(), operands, functionCall.getIsDistinct(), functionCall.getIgnoreNulls()); } public static RexExpression.Literal convertLiteral(Expressions.Literal literal) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java index a7304299a2..2053de0cc1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java @@ -64,9 +64,13 @@ public class RexExpressionToProtoExpression { for (RexExpression operand : operands) { protoOperands.add(convertExpression(operand)); } - return Expressions.FunctionCall.newBuilder().setDataType(convertColumnDataType(functionCall.getDataType())) - .setFunctionName(functionCall.getFunctionName()).addAllFunctionOperands(protoOperands) - .setIsDistinct(functionCall.isDistinct()).build(); + return Expressions.FunctionCall.newBuilder() + .setDataType(convertColumnDataType(functionCall.getDataType())) + .setFunctionName(functionCall.getFunctionName()) + .addAllFunctionOperands(protoOperands) + .setIsDistinct(functionCall.isDistinct()) + .setIgnoreNulls(functionCall.isIgnoreNulls()) + .build(); } public static Expressions.Literal convertLiteral(RexExpression.Literal literal) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java index 835297a1b6..2bb7d6adff 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java @@ -83,7 +83,7 @@ public class AggregateWindowFunction extends WindowFunction { List<Object> result = new ArrayList<>(numRows); for (int i = 0; i < numRows; i++) { if (lowerBound >= numRows) { - // Fill the remaining rows with null + // Fill the remaining rows with null since all subsequent windows will be out of bounds for (int j = i; j < numRows; j++) { result.add(null); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java index 78dc5b198f..a1f84be32e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java @@ -20,11 +20,12 @@ package org.apache.pinot.query.runtime.operator.window.value; import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelFieldCollation; +import org.apache.pinot.common.collections.DualValueList; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; @@ -42,15 +43,23 @@ public class FirstValueWindowFunction extends ValueWindowFunction { @Override public List<Object> processRows(List<Object[]> rows) { if (_windowFrame.isRowType()) { - return processRowsWindow(rows); + if (_ignoreNulls) { + return processRowsWindowIgnoreNulls(rows); + } else { + return processRowsWindow(rows); + } } else { - return processRangeWindow(rows); + if (_ignoreNulls) { + return processRangeWindowIgnoreNulls(rows); + } else { + return processRangeWindow(rows); + } } } private List<Object> processRowsWindow(List<Object[]> rows) { if (_windowFrame.isUnboundedPreceding() && _windowFrame.getUpperBound() >= 0) { - return processUnboundedPreceding(rows); + return fillAllWithValue(rows, extractValueFromRow(rows.get(0))); } int numRows = rows.size(); @@ -62,7 +71,7 @@ public class FirstValueWindowFunction extends ValueWindowFunction { for (int i = 0; i < numRows; i++) { if (lowerBound >= numRows) { - // Fill remaining rows with null + // Fill the remaining rows with null since all subsequent windows will be out of bounds for (int j = i; j < numRows; j++) { result.add(null); } @@ -78,12 +87,64 @@ public class FirstValueWindowFunction extends ValueWindowFunction { lowerBound++; upperBound = Math.min(upperBound + 1, numRows - 1); } + + return result; + } + + private List<Object> processRowsWindowIgnoreNulls(List<Object[]> rows) { + int numRows = rows.size(); + int lowerBound = _windowFrame.getLowerBound(); + int upperBound = Math.min(_windowFrame.getUpperBound(), numRows - 1); + + // Find first non-null value in the first window + int indexOfFirstNonNullValue = indexOfFirstNonNullValueInWindow(rows, Math.max(lowerBound, 0), upperBound); + List<Object> result = new ArrayList<>(numRows); + + for (int i = 0; i < numRows; i++) { + if (lowerBound >= numRows) { + // Fill the remaining rows with null since all subsequent windows will be out of bounds + for (int j = i; j < numRows; j++) { + result.add(null); + } + break; + } + + if (indexOfFirstNonNullValue != -1) { + result.add(extractValueFromRow(rows.get(indexOfFirstNonNullValue))); + } else { + result.add(null); + } + + // Slide the window forward by one row; check if indexOfFirstNonNullValue is the lower bound which will not be in + // the next window. If so, find the next non-null value. + if (lowerBound >= 0 && indexOfFirstNonNullValue == lowerBound) { + // Find first non-null value for the next window + indexOfFirstNonNullValue = + indexOfFirstNonNullValueInWindow(rows, lowerBound + 1, Math.min(upperBound + 1, numRows - 1)); + } + lowerBound++; + + // After the lower bound is updated, we also update the upper bound for the next window. The upper bound is only + // incremented if we're not already at the row boundary. If the upper bound is incremented, we also need to check + // if the new value being added into the window is non-null if the rest of the window has only null values (i.e., + // if indexOfFirstNonNullValue is -1). + if (upperBound < numRows - 1) { + upperBound++; + if (indexOfFirstNonNullValue == -1 && upperBound >= 0) { + Object value = extractValueFromRow(rows.get(upperBound)); + if (value != null) { + indexOfFirstNonNullValue = upperBound; + } + } + } + } + return result; } private List<Object> processRangeWindow(List<Object[]> rows) { if (_windowFrame.isUnboundedPreceding()) { - return processUnboundedPreceding(rows); + return fillAllWithValue(rows, extractValueFromRow(rows.get(0))); } // The lower bound has to be CURRENT ROW since we don't support RANGE windows with offset value @@ -113,12 +174,113 @@ public class FirstValueWindowFunction extends ValueWindowFunction { return result; } - private List<Object> processUnboundedPreceding(List<Object[]> rows) { + private List<Object> processRangeWindowIgnoreNulls(List<Object[]> rows) { int numRows = rows.size(); - assert numRows > 0; - Object value = extractValueFromRow(rows.get(0)); - Object[] result = new Object[numRows]; - Arrays.fill(result, value); - return Arrays.asList(result); + + if (_windowFrame.isUnboundedPreceding() && _windowFrame.isUnboundedFollowing()) { + // Find the first non-null value and fill it in all rows + int indexOfFirstNonNullValue = indexOfFirstNonNullValueInWindow(rows, 0, numRows - 1); + if (indexOfFirstNonNullValue == -1) { + // There's no non-null value + return Collections.nCopies(numRows, null); + } else { + return fillAllWithValue(rows, extractValueFromRow(rows.get(indexOfFirstNonNullValue))); + } + } + + if (_windowFrame.isUnboundedPreceding() && _windowFrame.isUpperBoundCurrentRow()) { + // Find the first non-null value and fill it in all rows starting from the first row of the peer group of the row + // with the first non-null value + int firstNonNullValueIndex = indexOfFirstNonNullValueInWindow(rows, 0, numRows - 1); + Key firstNonNullValueKey; + + // No non-null values + if (firstNonNullValueIndex == -1) { + return Collections.nCopies(numRows, null); + } else { + firstNonNullValueKey = AggregationUtils.extractRowKey(rows.get(firstNonNullValueIndex), _orderKeys); + } + + // Find the start of the peer group of the row with the first non-null value + int nullEndIndex; + for (nullEndIndex = 0; nullEndIndex < numRows; nullEndIndex++) { + Object[] row = rows.get(nullEndIndex); + Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys); + if (orderKey.equals(firstNonNullValueKey)) { + break; + } + } + + Object firstNonNullValue = extractValueFromRow(rows.get(firstNonNullValueIndex)); + return new DualValueList<>(null, nullEndIndex, firstNonNullValue, numRows - nullEndIndex); + } + + if (_windowFrame.isLowerBoundCurrentRow() && _windowFrame.isUpperBoundCurrentRow()) { + List<Object> result = new ArrayList<>(numRows); + Map<Key, Object> firstValueForKey = new HashMap<>(); + + for (Object[] row : rows) { + Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys); + Object value = extractValueFromRow(row); + + if (value != null) { + firstValueForKey.putIfAbsent(orderKey, value); + } + } + + for (Object[] row : rows) { + result.add(firstValueForKey.get(AggregationUtils.extractRowKey(row, _orderKeys))); + } + + return result; + } + + if (_windowFrame.isLowerBoundCurrentRow() && _windowFrame.isUnboundedFollowing()) { + List<Object> result = new ArrayList<>(numRows); + Map<Key, Object> firstValueForKey = new HashMap<>(); + + for (Object[] row : rows) { + Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys); + Object value = extractValueFromRow(row); + + if (value != null) { + firstValueForKey.putIfAbsent(orderKey, value); + } + } + + // Do a reverse iteration to get the first non-null value for each group. The first non-null value could either + // belong to the current group or any of the next groups if all the values in the current group are null. + Object prevNonNullValue = null; + for (int i = numRows - 1; i >= 0; i--) { + Object[] row = rows.get(i); + Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys); + Object value = firstValueForKey.get(orderKey); + + if (value != null) { + prevNonNullValue = value; + } + + result.add(prevNonNullValue); + } + + Collections.reverse(result); + return result; + } + + throw new IllegalStateException("RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); + } + + /** + * Both lowerBound and upperBound should be valid values for the given row set. The returned value is -1 if there is + * no non-null value in the window. + */ + private int indexOfFirstNonNullValueInWindow(List<Object[]> rows, int lowerBound, int upperBound) { + for (int i = lowerBound; i <= upperBound; i++) { + Object value = extractValueFromRow(rows.get(i)); + if (value != null) { + return i; + } + } + return -1; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java index 9c70722f8e..5383525d2d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java @@ -20,12 +20,12 @@ package org.apache.pinot.query.runtime.operator.window.value; import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelFieldCollation; +import org.apache.pinot.common.collections.DualValueList; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; @@ -43,18 +43,26 @@ public class LastValueWindowFunction extends ValueWindowFunction { @Override public List<Object> processRows(List<Object[]> rows) { if (_windowFrame.isRowType()) { - return processRowsWindow(rows); + if (_ignoreNulls) { + return processRowsWindowIgnoreNulls(rows); + } else { + return processRowsWindow(rows); + } } else { - return processRangeWindow(rows); + if (_ignoreNulls) { + return processRangeWindowIgnoreNulls(rows); + } else { + return processRangeWindow(rows); + } } } private List<Object> processRowsWindow(List<Object[]> rows) { + int numRows = rows.size(); if (_windowFrame.isUnboundedFollowing() && _windowFrame.getLowerBound() <= 0) { - return processUnboundedFollowing(rows); + return fillAllWithValue(rows, extractValueFromRow(rows.get(numRows - 1))); } - int numRows = rows.size(); List<Object> result = new ArrayList<>(numRows); // lowerBound is guaranteed to be less than or equal to upperBound here (but both can be -ve / +ve) @@ -63,7 +71,7 @@ public class LastValueWindowFunction extends ValueWindowFunction { for (int i = 0; i < numRows; i++) { if (lowerBound >= numRows) { - // Fill remaining rows with null + // Fill the remaining rows with null since all subsequent windows will be out of bounds for (int j = i; j < numRows; j++) { result.add(null); } @@ -83,16 +91,64 @@ public class LastValueWindowFunction extends ValueWindowFunction { return result; } + private List<Object> processRowsWindowIgnoreNulls(List<Object[]> rows) { + int numRows = rows.size(); + int lowerBound = _windowFrame.getLowerBound(); + int upperBound = Math.min(_windowFrame.getUpperBound(), numRows - 1); + + // Find last non-null value in the first window + int indexOfLastNonNullValue = indexOfLastNonNullValueInWindow(rows, Math.max(0, lowerBound), upperBound); + + List<Object> result = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + if (lowerBound >= numRows) { + // Fill the remaining rows with null since all subsequent windows will be out of bounds + for (int j = i; j < numRows; j++) { + result.add(null); + } + break; + } + + if (indexOfLastNonNullValue != -1) { + result.add(extractValueFromRow(rows.get(indexOfLastNonNullValue))); + } else { + result.add(null); + } + + // Slide the window forward by one row; check if indexOfLastNonNullValue is the lower bound which will not be in + // the next window. If so, update it to -1 since the window no longer has any non-null values. + if (indexOfLastNonNullValue == lowerBound) { + indexOfLastNonNullValue = -1; + } + lowerBound++; + + // After the lower bound is updated, we also update the upper bound for the next window. The upper bound is only + // incremented if we're not already at the row boundary. If the upper bound is incremented, we also need to update + // indexOfLastNonNullValue to the new upper bound if it contains a non-null value. + if (upperBound < numRows - 1) { + upperBound++; + if (upperBound >= 0) { + Object value = extractValueFromRow(rows.get(upperBound)); + if (value != null) { + indexOfLastNonNullValue = upperBound; + } + } + } + } + + return result; + } + private List<Object> processRangeWindow(List<Object[]> rows) { + int numRows = rows.size(); if (_windowFrame.isUnboundedFollowing()) { - return processUnboundedFollowing(rows); + return fillAllWithValue(rows, extractValueFromRow(rows.get(numRows - 1))); } // The upper bound has to be CURRENT ROW here since we don't support RANGE windows with offset value Preconditions.checkState(_windowFrame.isUpperBoundCurrentRow(), "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); - int numRows = rows.size(); List<Object> result = new ArrayList<>(numRows); Map<Key, Object> lastValueForKey = new HashMap<>(); @@ -117,12 +173,103 @@ public class LastValueWindowFunction extends ValueWindowFunction { return result; } - private List<Object> processUnboundedFollowing(List<Object[]> rows) { + private List<Object> processRangeWindowIgnoreNulls(List<Object[]> rows) { int numRows = rows.size(); - assert numRows > 0; - Object value = extractValueFromRow(rows.get(numRows - 1)); - Object[] result = new Object[numRows]; - Arrays.fill(result, value); - return Arrays.asList(result); + + if (_windowFrame.isUnboundedPreceding() && _windowFrame.isUnboundedFollowing()) { + // Find the last non-null value and fill it in all rows + int indexOfLastNonNullValue = indexOfLastNonNullValueInWindow(rows, 0, numRows - 1); + if (indexOfLastNonNullValue == -1) { + // There's no non-null value + return Collections.nCopies(numRows, null); + } else { + return fillAllWithValue(rows, extractValueFromRow(rows.get(indexOfLastNonNullValue))); + } + } + + if (_windowFrame.isUnboundedPreceding() && _windowFrame.isUpperBoundCurrentRow()) { + List<Object> result = new ArrayList<>(numRows); + Map<Key, Object> lastValueForKey = new HashMap<>(); + Object lastNonNullValue = null; + + for (Object[] row : rows) { + Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys); + Object value = extractValueFromRow(row); + + if (value != null) { + lastValueForKey.put(orderKey, value); + lastNonNullValue = value; + } else { + lastValueForKey.putIfAbsent(orderKey, lastNonNullValue); + } + } + + for (Object[] row : rows) { + result.add(lastValueForKey.get(AggregationUtils.extractRowKey(row, _orderKeys))); + } + + return result; + } + + if (_windowFrame.isLowerBoundCurrentRow() && _windowFrame.isUpperBoundCurrentRow()) { + List<Object> result = new ArrayList<>(numRows); + Map<Key, Object> lastValueForKey = new HashMap<>(); + + for (Object[] row : rows) { + Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys); + Object value = extractValueFromRow(row); + + if (value != null) { + lastValueForKey.put(orderKey, value); + } + } + + for (Object[] row : rows) { + result.add(lastValueForKey.get(AggregationUtils.extractRowKey(row, _orderKeys))); + } + + return result; + } + + if (_windowFrame.isLowerBoundCurrentRow() && _windowFrame.isUnboundedFollowing()) { + // Get last non-null value and fill it in all rows from the first row till the last row of the peer group of the + // row with the non-null value + int indexOfLastNonNullValue = indexOfLastNonNullValueInWindow(rows, 0, numRows - 1); + Key lastNonNullValueKey; + + // No non-null values + if (indexOfLastNonNullValue == -1) { + return Collections.nCopies(numRows, null); + } else { + lastNonNullValueKey = AggregationUtils.extractRowKey(rows.get(indexOfLastNonNullValue), _orderKeys); + } + + // Find the end of the peer group of the last row with the non-null value + int fillBoundary; + for (fillBoundary = indexOfLastNonNullValue + 1; fillBoundary < numRows; fillBoundary++) { + if (!AggregationUtils.extractRowKey(rows.get(fillBoundary), _orderKeys).equals(lastNonNullValueKey)) { + break; + } + } + + Object lastNonNullValue = extractValueFromRow(rows.get(indexOfLastNonNullValue)); + return new DualValueList<>(lastNonNullValue, fillBoundary, null, numRows - fillBoundary); + } + + throw new IllegalStateException("RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); + } + + /** + * Both lowerBound and upperBound should be valid values for the given row set. The returned value is -1 if there is + * no non-null value in the window. + */ + private int indexOfLastNonNullValueInWindow(List<Object[]> rows, int lowerBound, int upperBound) { + for (int i = upperBound; i >= lowerBound; i--) { + Object value = extractValueFromRow(rows.get(i)); + if (value != null) { + return i; + } + } + return -1; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java index ec9fed8ace..ed3adf03c7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java @@ -46,7 +46,7 @@ public class LeadValueWindowFunction extends ValueWindowFunction { if (numOperands > 1) { RexExpression secondOperand = operands.get(1); Preconditions.checkArgument(secondOperand instanceof RexExpression.Literal, - "Second operand (offset) of LAG function must be a literal"); + "Second operand (offset) of LEAD function must be a literal"); Object offsetValue = ((RexExpression.Literal) secondOperand).getValue(); if (offsetValue instanceof Number) { offset = ((Number) offsetValue).intValue(); @@ -55,7 +55,7 @@ public class LeadValueWindowFunction extends ValueWindowFunction { if (numOperands == 3) { RexExpression thirdOperand = operands.get(2); Preconditions.checkArgument(thirdOperand instanceof RexExpression.Literal, - "Third operand (default value) of LAG function must be a literal"); + "Third operand (default value) of LEAD function must be a literal"); RexExpression.Literal defaultValueLiteral = (RexExpression.Literal) thirdOperand; defaultValue = defaultValueLiteral.getValue(); if (defaultValue != null) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java index 33ac71c815..ce8a00d432 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.operator.window.value; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.calcite.rel.RelFieldCollation; @@ -40,8 +41,17 @@ public abstract class ValueWindowFunction extends WindowFunction { .build(); //@formatter:on + protected final boolean _ignoreNulls; + public ValueWindowFunction(RexExpression.FunctionCall aggCall, DataSchema inputSchema, List<RelFieldCollation> collations, WindowFrame windowFrame) { super(aggCall, inputSchema, collations, windowFrame); + _ignoreNulls = aggCall.isIgnoreNulls(); + } + + protected List<Object> fillAllWithValue(List<Object[]> rows, Object value) { + int numRows = rows.size(); + assert numRows > 0; + return Collections.nCopies(numRows, value); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java index a67726cd90..abb38b8d7d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java @@ -533,7 +533,6 @@ public class WindowAggregateOperatorTest { // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, keys, Map.of( 1, List.of( new Object[]{1, "foo", 1, null}, @@ -548,7 +547,6 @@ public class WindowAggregateOperatorTest { new Object[]{3, "and", 3, null}, new Object[]{3, "true", null, 3}) )); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @@ -581,7 +579,6 @@ public class WindowAggregateOperatorTest { // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, keys, Map.of( 1, List.of( new Object[]{1, "foo", 1, 200}, @@ -596,14 +593,12 @@ public class WindowAggregateOperatorTest { new Object[]{3, "and", 100, 200}, new Object[]{3, "true", 100, 3}) )); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testSumWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, frameType, Integer.MIN_VALUE, Integer.MAX_VALUE, getSum(new RexExpression.InputRef(1)), @@ -615,13 +610,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then (result should be the same for both window frame types): - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 59.0}, @@ -632,15 +625,13 @@ public class WindowAggregateOperatorTest { "B", List.of( new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 30.0} - ))); - //@formatter:on + ))); assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testSumWithUnboundedPrecedingLowerAndCurrentRowUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, frameType, Integer.MIN_VALUE, 0, getSum(new RexExpression.InputRef(1)), @@ -652,13 +643,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14.0}, @@ -670,14 +659,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithUnboundedPrecedingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, Integer.MIN_VALUE, 2, getSum(new RexExpression.InputRef(1)), @@ -689,13 +676,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 44.0}, @@ -707,14 +692,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, Integer.MIN_VALUE, -2, getSum(new RexExpression.InputRef(1)), @@ -726,13 +709,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -744,14 +725,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testSumWithCurrentRowLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, frameType, 0, Integer.MAX_VALUE, getSum(new RexExpression.InputRef(1)), @@ -763,13 +742,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 59.0}, @@ -781,14 +758,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 20.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testSumWithCurrentRowLowerAndCurrentRowUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, frameType, 0, 0, getSum(new RexExpression.InputRef(1)), @@ -800,13 +775,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14.0}, @@ -818,14 +791,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10.0}, new Object[]{"B", 20, 2005, 20.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithCurrentRowLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 0, 2, getSum(new RexExpression.InputRef(1)), @@ -837,13 +808,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 44.0}, @@ -855,14 +824,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 20.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithOffsetPrecedingLowerAndUnboundedFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, -1, Integer.MAX_VALUE, getSum(new RexExpression.InputRef(1)), @@ -874,13 +841,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 59.0}, @@ -892,14 +857,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithOffsetFollowingLowerAndUnboundedFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 1, Integer.MAX_VALUE, getSum(new RexExpression.InputRef(1)), @@ -911,13 +874,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 45.0}, @@ -929,14 +890,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20.0}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithOffsetPrecedingLowerAndCurrentRowUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, -2, 0, getSum(new RexExpression.InputRef(1)), @@ -948,13 +907,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14.0}, @@ -966,14 +923,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithOffsetPrecedingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, -1, 2, getSum(new RexExpression.InputRef(1)), @@ -985,13 +940,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 44.0}, @@ -1003,14 +956,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithVeryLargeOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, // Verify if overflows are handled correctly new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, -1, 2147483646, @@ -1023,13 +974,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 59.0}, @@ -1041,14 +990,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithVeryLargeOffsetFollowingLower() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, // Verify if overflows are handled correctly new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 2147483646, 2147483647, @@ -1061,13 +1008,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -1079,14 +1024,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithOffsetPrecedingLowerAndOffsetPrecedingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, -3, -2, getSum(new RexExpression.InputRef(1)), @@ -1098,13 +1041,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -1116,14 +1057,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithOffsetFollowingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 1, 2, getSum(new RexExpression.InputRef(1)), @@ -1135,13 +1074,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 30.0}, @@ -1153,14 +1090,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20.0}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testSumWithSamePartitionAndCollationKey() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 0, RANGE, Integer.MIN_VALUE, 0, getSum(new RexExpression.InputRef(1)), @@ -1172,13 +1107,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 59.0}, @@ -1190,14 +1123,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 30.0}, new Object[]{"B", 20, 2005, 30.0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testMinWithRowsWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 1, getMin(new RexExpression.InputRef(1)), @@ -1209,13 +1140,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 10}, @@ -1227,14 +1156,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testMinWithRangeWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, RANGE, 0, Integer.MAX_VALUE, getMin(new RexExpression.InputRef(1)), @@ -1247,13 +1174,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", null, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 10}, @@ -1266,14 +1191,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", null, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testMaxWithRowsWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 2, getMax(new RexExpression.InputRef(1)), @@ -1285,13 +1208,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 20, 2000, 20}, @@ -1303,14 +1224,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testMaxWithRangeWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, RANGE, 0, 0, getMax(new RexExpression.InputRef(1)), @@ -1323,13 +1242,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 20, 2000}, new Object[]{"B", null, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1342,14 +1259,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 20, 2000, 20}, new Object[]{"B", null, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testBoolAndWithRowsWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, ROWS, -2, -1, getBoolAnd(new RexExpression.InputRef(1)), @@ -1362,13 +1277,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000}, new Object[]{"B", 0, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 0, 2000, null}, @@ -1381,14 +1294,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000, null}, new Object[]{"B", 0, 2005, 1} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testBoolAndWithRangeWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, RANGE, Integer.MIN_VALUE, 0, getBoolAnd(new RexExpression.InputRef(1)), @@ -1401,13 +1312,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000}, new Object[]{"B", 0, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 0, 2000, 0}, @@ -1420,14 +1329,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000, 1}, new Object[]{"B", 0, 2005, 0} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testBoolOrWithRowsWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, ROWS, 1, 2, getBoolOr(new RexExpression.InputRef(1)), @@ -1440,13 +1347,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000}, new Object[]{"B", 0, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 0, 2000, 1}, @@ -1459,14 +1364,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000, 0}, new Object[]{"B", 0, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testBoolOrWithRangeWindow() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, RANGE, Integer.MIN_VALUE, Integer.MAX_VALUE, getBoolOr(new RexExpression.InputRef(1)), @@ -1479,13 +1382,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000}, new Object[]{"B", 0, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 0, 2000, 1}, @@ -1498,14 +1399,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 1, 2000, 1}, new Object[]{"B", 0, 2005, 1} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testFirstValueWithUnboundedPrecedingLowerAndCurrentRowUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, 0, getFirstValue(new RexExpression.InputRef(1)), @@ -1517,13 +1416,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1535,7 +1432,6 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @@ -1543,7 +1439,6 @@ public class WindowAggregateOperatorTest { public void testFirstValueWithUnboundedPrecedingLowerAndUnboundedFollowingUpper( WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, Integer.MAX_VALUE, getFirstValue(new RexExpression.InputRef(1)), @@ -1555,13 +1450,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1573,14 +1466,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testFirstValueWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, -2, getFirstValue(new RexExpression.InputRef(1)), @@ -1592,13 +1483,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -1610,14 +1499,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testFirstValueWithUnboundedPrecedingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, 2, getFirstValue(new RexExpression.InputRef(1)), @@ -1629,13 +1516,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1647,14 +1532,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testFirstValueWithCurrentRowLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, Integer.MAX_VALUE, getFirstValue(new RexExpression.InputRef(1)), @@ -1666,13 +1549,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1684,14 +1565,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testFirstValueWithCurrentRowLowerAndCurrentRowUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, 0, getFirstValue(new RexExpression.InputRef(1)), @@ -1703,13 +1582,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1721,14 +1598,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testFirstValueWithCurrentRowLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 2, getFirstValue(new RexExpression.InputRef(1)), @@ -1740,13 +1615,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1758,14 +1631,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testFirstValueWithOffsetPrecedingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 2, getFirstValue(new RexExpression.InputRef(1)), @@ -1777,13 +1648,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1795,14 +1664,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testFirstValueWithOffsetPrecedingLowerAndOffsetPrecedingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, -1, getFirstValue(new RexExpression.InputRef(1)), @@ -1814,13 +1681,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -1832,14 +1697,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testFirstValueWithOffsetFollowingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 2, 3, getFirstValue(new RexExpression.InputRef(1)), @@ -1851,13 +1714,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 20}, @@ -1869,14 +1730,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testLastValueWithUnboundedPrecedingLowerAndCurrentRowUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, 0, getLastValue(new RexExpression.InputRef(1)), @@ -1888,13 +1747,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -1906,7 +1763,6 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @@ -1914,7 +1770,6 @@ public class WindowAggregateOperatorTest { public void testLastValueWithUnboundedPrecedingLowerAndUnboundedFollowingUpper( WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, Integer.MAX_VALUE, getLastValue(new RexExpression.InputRef(1)), @@ -1926,13 +1781,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 15}, @@ -1944,14 +1797,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testLastValueWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, -2, getLastValue(new RexExpression.InputRef(1)), @@ -1963,13 +1814,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -1981,14 +1830,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testLastValueWithUnboundedPrecedingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, 2, getLastValue(new RexExpression.InputRef(1)), @@ -2000,13 +1847,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 20}, @@ -2018,14 +1863,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testLastValueWithCurrentRowLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, Integer.MAX_VALUE, getLastValue(new RexExpression.InputRef(1)), @@ -2037,13 +1880,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 15}, @@ -2055,14 +1896,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test(dataProvider = "windowFrameTypes") public void testLastValueWithCurrentRowLowerAndCurrentRowUpper(WindowNode.WindowFrameType frameType) { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, 0, getLastValue(new RexExpression.InputRef(1)), @@ -2074,13 +1913,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 14}, @@ -2092,14 +1929,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 10}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testLastValueWithCurrentRowLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 2, getLastValue(new RexExpression.InputRef(1)), @@ -2111,13 +1946,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 20}, @@ -2129,14 +1962,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testLastValueWithOffsetPrecedingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 2, getLastValue(new RexExpression.InputRef(1)), @@ -2148,13 +1979,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 20}, @@ -2166,14 +1995,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, 20} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testLastValueWithOffsetPrecedingLowerAndOffsetPrecedingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, -1, getLastValue(new RexExpression.InputRef(1)), @@ -2185,13 +2012,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, null}, @@ -2203,14 +2028,12 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, null}, new Object[]{"B", 20, 2005, 10} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test public void testLastValueWithOffsetFollowingLowerAndOffsetFollowingUpper() { // Given: - //@formatter:off WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 1, 3, getLastValue(new RexExpression.InputRef(1)), @@ -2222,13 +2045,11 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000}, new Object[]{"B", 20, 2005} }); - //@formatter:on // When: List<Object[]> resultRows = operator.nextBlock().getContainer(); // Then: - //@formatter:off verifyResultRows(resultRows, List.of(0), Map.of( "A", List.of( new Object[]{"A", 14, 2000, 15}, @@ -2240,119 +2061,820 @@ public class WindowAggregateOperatorTest { new Object[]{"B", 10, 2000, 20}, new Object[]{"B", 20, 2005, null} ))); - //@formatter:on assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } - private WindowAggregateOperator prepareDataForWindowFunction(String[] inputSchemaCols, - ColumnDataType[] inputSchemaColTypes, ColumnDataType outputType, List<Integer> partitionKeys, - int collationFieldIndex, WindowNode.WindowFrameType frameType, int windowFrameLowerBound, - int windowFrameUpperBound, RexExpression.FunctionCall functionCall, Object[][] rows) { - DataSchema inputSchema = new DataSchema(inputSchemaCols, inputSchemaColTypes); - when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, rows)) - .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); + @Test(dataProvider = "windowFrameTypes") + public void testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndCurrentRowUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, 0, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", 20, 2005} + }); - String[] outputSchemaCols = new String[inputSchemaCols.length + 1]; - System.arraycopy(inputSchemaCols, 0, outputSchemaCols, 0, inputSchemaCols.length); - outputSchemaCols[inputSchemaCols.length] = functionCall.getFunctionName().toLowerCase(); + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); - ColumnDataType[] outputSchemaColTypes = new ColumnDataType[inputSchemaColTypes.length + 1]; - System.arraycopy(inputSchemaColTypes, 0, outputSchemaColTypes, 0, inputSchemaColTypes.length); - outputSchemaColTypes[inputSchemaCols.length] = outputType; + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, frameType == ROWS ? null : 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 10}, + new Object[]{"A", null, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", 20, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } - DataSchema resultSchema = new DataSchema(outputSchemaCols, outputSchemaColTypes); - List<RexExpression.FunctionCall> aggCalls = List.of(functionCall); - List<RelFieldCollation> collations = List.of(new RelFieldCollation(collationFieldIndex)); - return getOperator(inputSchema, resultSchema, partitionKeys, collations, aggCalls, frameType, windowFrameLowerBound, - windowFrameUpperBound); + @Test(dataProvider = "windowFrameTypes") + public void testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndUnboundedFollowingUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, Integer.MAX_VALUE, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", 20, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, 10}, + new Object[]{"A", null, 2002, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 10}, + new Object[]{"A", null, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", 20, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test - public void testShouldThrowOnWindowFrameWithInvalidOffsetBounds() { + public void testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() { // Given: - DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); - when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{2, "foo"})) - .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); - DataSchema resultSchema = - new DataSchema(new String[]{"group", "arg", "sum"}, new ColumnDataType[]{INT, STRING, DOUBLE}); - List<Integer> keys = List.of(0); - List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new RexExpression.InputRef(1))); + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, -1, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); - // Then: - IllegalStateException e = Assert.expectThrows(IllegalStateException.class, - () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, ROWS, 5, 2)); - assertEquals(e.getMessage(), "Window frame lower bound can't be greater than upper bound"); + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); - e = Assert.expectThrows(IllegalStateException.class, - () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, ROWS, -2, -3)); - assertEquals(e.getMessage(), "Window frame lower bound can't be greater than upper bound"); + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, null}, + new Object[]{"A", 10, 2002, null}, + new Object[]{"A", 20, 2008, 10}, + new Object[]{"A", null, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, null}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } @Test - public void testShouldThrowOnWindowFrameWithOffsetBoundsForRange() { - // TODO: Remove this test when support for RANGE window frames with offset PRECEDING / FOLLOWING is added + public void testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetFollowingUpper() { // Given: - DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); - when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{2, "foo"})) - .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); - DataSchema resultSchema = - new DataSchema(new String[]{"group", "arg", "sum"}, new ColumnDataType[]{INT, STRING, DOUBLE}); - List<Integer> keys = List.of(0); - List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new RexExpression.InputRef(1))); + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, 1, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); - // Then: - IllegalStateException e = Assert.expectThrows(IllegalStateException.class, - () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, WindowNode.WindowFrameType.RANGE, 5, - Integer.MAX_VALUE)); - assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); - e = Assert.expectThrows(IllegalStateException.class, - () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, WindowNode.WindowFrameType.RANGE, - Integer.MAX_VALUE, 5)); - assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 10}, + new Object[]{"A", null, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } - private WindowAggregateOperator getOperator(DataSchema inputSchema, DataSchema resultSchema, List<Integer> keys, - List<RelFieldCollation> collations, List<RexExpression.FunctionCall> aggCalls, - WindowNode.WindowFrameType windowFrameType, int lowerBound, int upperBound, PlanNode.NodeHint nodeHint) { - return new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), _input, inputSchema, - new WindowNode(-1, resultSchema, nodeHint, List.of(), keys, collations, aggCalls, windowFrameType, lowerBound, - upperBound, List.of())); - } + @Test(dataProvider = "windowFrameTypes") + public void testFirstValueIgnoreNullsWithCurrentRowLowerAndUnboundedFollowingUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, Integer.MAX_VALUE, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); - private WindowAggregateOperator getOperator(DataSchema inputSchema, DataSchema resultSchema, List<Integer> keys, - List<RelFieldCollation> collations, List<RexExpression.FunctionCall> aggCalls, - WindowNode.WindowFrameType windowFrameType, int lowerBound, int upperBound) { - return getOperator(inputSchema, resultSchema, keys, collations, aggCalls, windowFrameType, lowerBound, upperBound, - PlanNode.NodeHint.EMPTY); - } + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); - private static RexExpression.FunctionCall getSum(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.SUM.name(), List.of(arg)); + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, 10}, + new Object[]{"A", null, 2002, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 20}, + new Object[]{"A", null, 2008, frameType == ROWS ? null : 20} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } - private static RexExpression.FunctionCall getMin(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.MIN.name(), List.of(arg)); - } + @Test(dataProvider = "windowFrameTypes") + public void testFirstValueIgnoreNullsWithCurrentRowLowerAndCurrentRowUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, 0, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); - private static RexExpression.FunctionCall getMax(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.MAX.name(), List.of(arg)); - } + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); - private static RexExpression.FunctionCall getBoolAnd(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLAND", List.of(arg)); + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, frameType == ROWS ? null : 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 20}, + new Object[]{"A", null, 2008, frameType == ROWS ? null : 20} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } - private static RexExpression.FunctionCall getBoolOr(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLOR", List.of(arg)); - } + @Test + public void testFirstValueIgnoreNullsWithCurrentRowLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 1, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); - private static RexExpression.FunctionCall getFirstValue(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.FIRST_VALUE.name(), List.of(arg)); + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 20}, + new Object[]{"A", null, 2008, null} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); } - private static RexExpression.FunctionCall getLastValue(RexExpression arg) { - return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LAST_VALUE.name(), List.of(arg)); + @Test + public void testFirstValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 1, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", 20, 2008, 10}, + new Object[]{"A", null, 2008, 20} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testFirstValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetPrecedingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, -1, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, null}, + new Object[]{"A", null, 2002, null}, + new Object[]{"A", 10, 2002, null}, + new Object[]{"A", 20, 2008, 10}, + new Object[]{"A", null, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, null}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testFirstValueIgnoreNullsWithOffsetFollowingLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 1, 3, + getFirstValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", null, 2000}, + new Object[]{"A", null, 2002}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 20, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", null, 2000, 10}, + new Object[]{"A", null, 2002, 10}, + new Object[]{"A", 10, 2002, 20}, + new Object[]{"A", 20, 2008, null}, + new Object[]{"A", null, 2008, null} + ), + "B", List.of( + new Object[]{"B", 10, 2000, null}, + new Object[]{"B", null, 2005, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test(dataProvider = "windowFrameTypes") + public void testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndCurrentRowUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, 0, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 14}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", null, 2008, frameType == ROWS ? 10 : 15}, + new Object[]{"A", 15, 2008, 15} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test(dataProvider = "windowFrameTypes") + public void testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndUnboundedFollowingUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, Integer.MIN_VALUE, Integer.MAX_VALUE, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 15, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 15}, + new Object[]{"A", 10, 2002, 15}, + new Object[]{"A", 15, 2008, 15}, + new Object[]{"A", null, 2008, 15} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, -1, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, null}, + new Object[]{"A", 10, 2002, 14}, + new Object[]{"A", null, 2008, 10}, + new Object[]{"A", 15, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, null}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, Integer.MIN_VALUE, 2, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 10}, + new Object[]{"A", 10, 2002, 15}, + new Object[]{"A", null, 2008, 15}, + new Object[]{"A", 15, 2008, 15} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test(dataProvider = "windowFrameTypes") + public void testLastValueIgnoreNullsWithCurrentRowLowerAndUnboundedFollowingUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, Integer.MAX_VALUE, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", 15, 2008}, + new Object[]{"A", null, 2008}, + new Object[]{"A", null, 2010}, + new Object[]{"B", null, 2000}, + new Object[]{"B", 10, 2005} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 15}, + new Object[]{"A", 10, 2002, 15}, + new Object[]{"A", 15, 2008, 15}, + new Object[]{"A", null, 2008, frameType == ROWS ? null : 15}, + new Object[]{"A", null, 2010, null} + ), + "B", List.of( + new Object[]{"B", null, 2000, 10}, + new Object[]{"B", 10, 2005, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test(dataProvider = "windowFrameTypes") + public void testLastValueIgnoreNullsWithCurrentRowLowerAndCurrentRowUpper( + WindowNode.WindowFrameType frameType) { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 0, 0, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2000}, + new Object[]{"B", null, 2008} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 14}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", null, 2008, frameType == ROWS ? null : 15}, + new Object[]{"A", 15, 2008, 15} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2000, frameType == ROWS ? null : 10}, + new Object[]{"B", null, 2008, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testLastValueIgnoreNullsWithCurrentRowLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 1, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2008} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", null, 2008, 15}, + new Object[]{"A", 15, 2008, 15} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2008, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testLastValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 1, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2008} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 10}, + new Object[]{"A", 10, 2002, 10}, + new Object[]{"A", null, 2008, 15}, + new Object[]{"A", 15, 2008, 15} + ), + "B", List.of( + new Object[]{"B", 10, 2000, 10}, + new Object[]{"B", null, 2008, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testLastValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetPrecedingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, -1, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2008} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, null}, + new Object[]{"A", 10, 2002, 14}, + new Object[]{"A", null, 2008, 10}, + new Object[]{"A", 15, 2008, 10} + ), + "B", List.of( + new Object[]{"B", 10, 2000, null}, + new Object[]{"B", null, 2008, 10} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + @Test + public void testLastValueIgnoreNullsWithOffsetFollowingLowerAndOffsetFollowingUpper() { + // Given: + WindowAggregateOperator operator = prepareDataForWindowFunction(new String[]{"name", "value", "year"}, + new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 1, 2, + getLastValue(new RexExpression.InputRef(1), true), + new Object[][]{ + new Object[]{"A", 14, 2000}, + new Object[]{"A", 10, 2002}, + new Object[]{"A", null, 2008}, + new Object[]{"A", 15, 2008}, + new Object[]{"B", 10, 2000}, + new Object[]{"B", null, 2008} + }); + + // When: + List<Object[]> resultRows = operator.nextBlock().getContainer(); + + // Then: + verifyResultRows(resultRows, List.of(0), Map.of( + "A", List.of( + new Object[]{"A", 14, 2000, 10}, + new Object[]{"A", 10, 2002, 15}, + new Object[]{"A", null, 2008, 15}, + new Object[]{"A", 15, 2008, null} + ), + "B", List.of( + new Object[]{"B", 10, 2000, null}, + new Object[]{"B", null, 2008, null} + ))); + assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second block is EOS (done processing)"); + } + + private WindowAggregateOperator prepareDataForWindowFunction(String[] inputSchemaCols, + ColumnDataType[] inputSchemaColTypes, ColumnDataType outputType, List<Integer> partitionKeys, + int collationFieldIndex, WindowNode.WindowFrameType frameType, int windowFrameLowerBound, + int windowFrameUpperBound, RexExpression.FunctionCall functionCall, Object[][] rows) { + DataSchema inputSchema = new DataSchema(inputSchemaCols, inputSchemaColTypes); + when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, rows)) + .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); + + String[] outputSchemaCols = new String[inputSchemaCols.length + 1]; + System.arraycopy(inputSchemaCols, 0, outputSchemaCols, 0, inputSchemaCols.length); + outputSchemaCols[inputSchemaCols.length] = functionCall.getFunctionName().toLowerCase(); + + ColumnDataType[] outputSchemaColTypes = new ColumnDataType[inputSchemaColTypes.length + 1]; + System.arraycopy(inputSchemaColTypes, 0, outputSchemaColTypes, 0, inputSchemaColTypes.length); + outputSchemaColTypes[inputSchemaCols.length] = outputType; + + DataSchema resultSchema = new DataSchema(outputSchemaCols, outputSchemaColTypes); + List<RexExpression.FunctionCall> aggCalls = List.of(functionCall); + List<RelFieldCollation> collations = List.of(new RelFieldCollation(collationFieldIndex)); + return getOperator(inputSchema, resultSchema, partitionKeys, collations, aggCalls, frameType, windowFrameLowerBound, + windowFrameUpperBound); + } + + @Test + public void testShouldThrowOnWindowFrameWithInvalidOffsetBounds() { + // Given: + DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); + when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{2, "foo"})) + .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); + DataSchema resultSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new ColumnDataType[]{INT, STRING, DOUBLE}); + List<Integer> keys = List.of(0); + List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new RexExpression.InputRef(1))); + + // Then: + IllegalStateException e = Assert.expectThrows(IllegalStateException.class, + () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, ROWS, 5, 2)); + assertEquals(e.getMessage(), "Window frame lower bound can't be greater than upper bound"); + + e = Assert.expectThrows(IllegalStateException.class, + () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, ROWS, -2, -3)); + assertEquals(e.getMessage(), "Window frame lower bound can't be greater than upper bound"); + } + + @Test + public void testShouldThrowOnWindowFrameWithOffsetBoundsForRange() { + // TODO: Remove this test when support for RANGE window frames with offset PRECEDING / FOLLOWING is added + // Given: + DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, STRING}); + when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{2, "foo"})) + .thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0)); + DataSchema resultSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new ColumnDataType[]{INT, STRING, DOUBLE}); + List<Integer> keys = List.of(0); + List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new RexExpression.InputRef(1))); + + // Then: + IllegalStateException e = Assert.expectThrows(IllegalStateException.class, + () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, WindowNode.WindowFrameType.RANGE, 5, + Integer.MAX_VALUE)); + assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); + + e = Assert.expectThrows(IllegalStateException.class, + () -> getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls, WindowNode.WindowFrameType.RANGE, + Integer.MAX_VALUE, 5)); + assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); + } + + private WindowAggregateOperator getOperator(DataSchema inputSchema, DataSchema resultSchema, List<Integer> keys, + List<RelFieldCollation> collations, List<RexExpression.FunctionCall> aggCalls, + WindowNode.WindowFrameType windowFrameType, int lowerBound, int upperBound, PlanNode.NodeHint nodeHint) { + return new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), _input, inputSchema, + new WindowNode(-1, resultSchema, nodeHint, List.of(), keys, collations, aggCalls, windowFrameType, lowerBound, + upperBound, List.of())); + } + + private WindowAggregateOperator getOperator(DataSchema inputSchema, DataSchema resultSchema, List<Integer> keys, + List<RelFieldCollation> collations, List<RexExpression.FunctionCall> aggCalls, + WindowNode.WindowFrameType windowFrameType, int lowerBound, int upperBound) { + return getOperator(inputSchema, resultSchema, keys, collations, aggCalls, windowFrameType, lowerBound, upperBound, + PlanNode.NodeHint.EMPTY); + } + + private static RexExpression.FunctionCall getSum(RexExpression arg) { + return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.SUM.name(), List.of(arg)); + } + + private static RexExpression.FunctionCall getMin(RexExpression arg) { + return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.MIN.name(), List.of(arg)); + } + + private static RexExpression.FunctionCall getMax(RexExpression arg) { + return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.MAX.name(), List.of(arg)); + } + + private static RexExpression.FunctionCall getBoolAnd(RexExpression arg) { + return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLAND", List.of(arg)); + } + + private static RexExpression.FunctionCall getBoolOr(RexExpression arg) { + return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLOR", List.of(arg)); + } + + private static RexExpression.FunctionCall getFirstValue(RexExpression arg) { + return getFirstValue(arg, false); + } + + private static RexExpression.FunctionCall getFirstValue(RexExpression arg, boolean ignoreNulls) { + return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.FIRST_VALUE.name(), List.of(arg), false, + ignoreNulls); + } + + private static RexExpression.FunctionCall getLastValue(RexExpression arg) { + return getLastValue(arg, false); + } + + private static RexExpression.FunctionCall getLastValue(RexExpression arg, boolean ignoreNulls) { + return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LAST_VALUE.name(), List.of(arg), false, + ignoreNulls); } private static void verifyResultRows(List<Object[]> resultRows, List<Integer> keys, diff --git a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json index dd25411d26..c1cbe9cb0d 100644 --- a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json +++ b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json @@ -2388,28 +2388,32 @@ {"name": "int_col", "type": "INT"}, {"name": "double_col", "type": "DOUBLE"}, {"name": "string_col", "type": "STRING"}, - {"name": "bool_col", "type": "BOOLEAN"} + {"name": "bool_col", "type": "BOOLEAN"}, + {"name": "nullable_int_col", "type": "INT"} ], "inputs": [ - [2, 300, "a", true], - [2, 400, "a", true], - [3, 100, "b", false], - [3, 100, "c", true], - [100, 1, "b", false], - [42, 50.5, "e", true], - [42, 42, "d", false], - [42, 75, "a", true], - [42, 42, "a", false], - [42, 50.5, "a", true], - [42, 42, "e", false], - [-101, 1.01, "c", false], - [150, 1.5, "c", false], - [150, -1.53, "h", false], - [3, 100, "g", true], - [2, 400, "c", false] + [2, 300, "a", true, 1], + [2, 400, "a", true, null], + [3, 100, "b", false, null], + [3, 100, "c", true, 3], + [100, 1, "b", false, 1], + [42, 50.5, "e", true, 2], + [42, 42, "d", false, null], + [42, 75, "a", true, 5], + [42, 42, "a", false, 4], + [42, 50.5, "a", true, null], + [42, 42, "e", false, null], + [-101, 1.01, "c", false, 7], + [150, 1.5, "c", false, 6], + [150, -1.53, "h", false, null], + [3, 100, "g", true, 10], + [2, 400, "c", false, null] ] } }, + "extraProps": { + "enableColumnBasedNullHandling": true + }, "queries": [ { "description": "Single OVER(PARTITION BY) sum", @@ -5346,6 +5350,116 @@ [3, "c", 3], [4, "c", 150] ] + }, + { + "description": "FIRST_VALUE with unbounded window frame", + "sql": "SELECT string_col, double_col, nullable_int_col, FIRST_VALUE(nullable_int_col) OVER(PARTITION BY string_col ORDER BY double_col RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM {tbl} ORDER BY string_col", + "outputs": [ + ["a", 42.0, 4, 4], + ["a", 50.5, null, 4], + ["a", 75.0, 5, 4], + ["a", 300.0, 1, 4], + ["a", 400.0, null, 4], + ["b", 1.0, 1, 1], + ["b", 100.0, null, 1], + ["c", 1.01, 7, 7], + ["c", 1.5, 6, 7], + ["c", 100.0, 3, 7], + ["c", 400.0, null, 7], + ["d", 42.0, null, null], + ["e", 42.0, null, null], + ["e", 50.5, 2, null], + ["g", 100.0, 10, 10], + ["h", -1.53, null, null] + ] + }, + { + "description": "FIRST_VALUE with IGNORE NULLS and default window frame", + "sql": "SELECT string_col, double_col, nullable_int_col, FIRST_VALUE(nullable_int_col) IGNORE NULLS OVER(PARTITION BY string_col ORDER BY double_col) FROM {tbl} ORDER BY string_col", + "outputs": [ + ["a", 42.0, 4, 4], + ["a", 50.5, null, 4], + ["a", 75.0, 5, 4], + ["a", 300.0, 1, 4], + ["a", 400.0, null, 4], + ["b", 1.0, 1, 1], + ["b", 100.0, null, 1], + ["c", 1.01, 7, 7], + ["c", 1.5, 6, 7], + ["c", 100.0, 3, 7], + ["c", 400.0, null, 7], + ["d", 42.0, null, null], + ["e", 42.0, null, null], + ["e", 50.5, 2, 2], + ["g", 100.0, 10, 10], + ["h", -1.53, null, null] + ] + }, + { + "description": "LAST_VALUE with RANGE BETWEEN CURRENT ROW AND CURRENT ROW window frame", + "sql": "SELECT string_col, double_col, nullable_int_col, LAST_VALUE(nullable_int_col) OVER(PARTITION BY string_col ORDER BY double_col RANGE BETWEEN CURRENT ROW AND CURRENT ROW) FROM {tbl} ORDER BY string_col", + "outputs": [ + ["a", 42.0, 4, 4], + ["a", 50.5, null, null], + ["a", 75.0, 5, 5], + ["a", 300.0, 1, 1], + ["a", 400.0, null, null], + ["b", 1.0, 1, 1], + ["b", 100.0, null, null], + ["c", 1.01, 7, 7], + ["c", 1.5, 6, 6], + ["c", 100.0, 3, 3], + ["c", 400.0, null, null], + ["d", 42.0, null, null], + ["e", 42.0, null, null], + ["e", 50.5, 2, 2], + ["g", 100.0, 10, 10], + ["h", -1.53, null, null] + ] + }, + { + "description": "LAST_VALUE with IGNORE NULLS and bounded ROWS window frame", + "sql": "SELECT string_col, double_col, nullable_int_col, LAST_VALUE(nullable_int_col) IGNORE NULLS OVER(PARTITION BY string_col ORDER BY double_col ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM {tbl} ORDER BY string_col", + "outputs": [ + ["a", 42.0, 4, 4], + ["a", 50.5, null, 5], + ["a", 75.0, 5, 1], + ["a", 300.0, 1, 1], + ["a", 400.0, null, 1], + ["b", 1.0, 1, 1], + ["b", 100.0, null, 1], + ["c", 1.01, 7, 6], + ["c", 1.5, 6, 3], + ["c", 100.0, 3, 3], + ["c", 400.0, null, 3], + ["d", 42.0, null, null], + ["e", 42.0, null, 2], + ["e", 50.5, 2, 2], + ["g", 100.0, 10, 10], + ["h", -1.53, null, null] + ] + }, + { + "description": "LAST_VALUE with RESPECT NULLS and bounded ROWS window frame", + "sql": "SELECT string_col, double_col, nullable_int_col, LAST_VALUE(nullable_int_col) RESPECT NULLS OVER(PARTITION BY string_col ORDER BY double_col ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING) FROM {tbl} ORDER BY string_col", + "outputs": [ + ["a", 42.0, 4, null], + ["a", 50.5, null, 4], + ["a", 75.0, 5, null], + ["a", 300.0, 1, 5], + ["a", 400.0, null, 1], + ["b", 1.0, 1, null], + ["b", 100.0, null, 1], + ["c", 1.01, 7, null], + ["c", 1.5, 6, 7], + ["c", 100.0, 3, 6], + ["c", 400.0, null, 3], + ["d", 42.0, null, null], + ["e", 42.0, null, null], + ["e", 50.5, 2, null], + ["g", 100.0, 10, null], + ["h", -1.53, null, null] + ] } ] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org