This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd21f28cb Add IGNORE NULLS option to FIRST_VALUE and LAST_VALUE 
window functions (#14264)
6fd21f28cb is described below

commit 6fd21f28cb6214668c5724d1ca4062b4a48ee19b
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Tue Oct 29 04:59:46 2024 +0530

    Add IGNORE NULLS option to FIRST_VALUE and LAST_VALUE window functions 
(#14264)
---
 .../pinot/common/collections/DualValueList.java    |   54 +
 pinot-common/src/main/proto/expressions.proto      |    1 +
 .../tests/NullHandlingIntegrationTest.java         |   25 +
 .../pinot/calcite/sql/fun/PinotOperatorTable.java  |   41 +-
 .../planner/logical/PlanNodeToRelConverter.java    |    2 +-
 .../pinot/query/planner/logical/RexExpression.java |   11 +-
 .../query/planner/logical/RexExpressionUtils.java  |    4 +-
 .../serde/ProtoExpressionToRexExpression.java      |    2 +-
 .../serde/RexExpressionToProtoExpression.java      |   10 +-
 .../window/aggregate/AggregateWindowFunction.java  |    2 +-
 .../window/value/FirstValueWindowFunction.java     |  186 +++-
 .../window/value/LastValueWindowFunction.java      |  175 +++-
 .../window/value/LeadValueWindowFunction.java      |    4 +-
 .../operator/window/value/ValueWindowFunction.java |   10 +
 .../operator/WindowAggregateOperatorTest.java      | 1046 +++++++++++++++-----
 .../test/resources/queries/WindowFunctions.json    |  148 ++-
 16 files changed, 1400 insertions(+), 321 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/collections/DualValueList.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/collections/DualValueList.java
new file mode 100644
index 0000000000..6db8c63baa
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/collections/DualValueList.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.collections;
+
+import java.util.AbstractList;
+
+
+/**
+ * An immutable list like the one returned by {@link 
java.util.Collections#nCopies(int, Object)}, but with two values
+ * (that are not interleaved) instead of a single one. Useful for avoiding 
unnecessary allocations.
+ */
+public class DualValueList<E> extends AbstractList<E> {
+
+  private final E _firstValue;
+  private final E _secondValue;
+  private final int _firstCount;
+  private final int _totalSize;
+
+  public DualValueList(E firstValue, int firstCount, E secondValue, int 
secondCount) {
+    _firstValue = firstValue;
+    _firstCount = firstCount;
+    _secondValue = secondValue;
+    _totalSize = firstCount + secondCount;
+  }
+
+  @Override
+  public E get(int index) {
+    if (index < 0 || index >= _totalSize) {
+      throw new IndexOutOfBoundsException(index);
+    }
+    return index < _firstCount ? _firstValue : _secondValue;
+  }
+
+  @Override
+  public int size() {
+    return _totalSize;
+  }
+}
diff --git a/pinot-common/src/main/proto/expressions.proto 
b/pinot-common/src/main/proto/expressions.proto
index 83ce64f265..e402bd97c8 100644
--- a/pinot-common/src/main/proto/expressions.proto
+++ b/pinot-common/src/main/proto/expressions.proto
@@ -92,6 +92,7 @@ message FunctionCall {
   string functionName = 2;
   repeated Expression functionOperands = 3;
   bool isDistinct = 4;
+  bool ignoreNulls = 5;
 }
 
 message Expression {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index cf3911a32e..ad673f2e50 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -326,6 +326,31 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     assertTrue(response.get("resultTable").get("rows").get(0).get(0).isNull());
   }
 
+  @Test
+  public void testWindowFunctionIgnoreNulls()
+      throws Exception {
+    // Window functions are only supported in the multi-stage query engine
+    setUseMultiStageQueryEngine(true);
+    String sqlQuery =
+        "SELECT salary, LAST_VALUE(salary) IGNORE NULLS OVER (ORDER BY 
DaysSinceEpoch) AS gapfilledSalary from "
+            + "mytable";
+    JsonNode response = postQuery(sqlQuery);
+    assertNoError(response);
+
+    // Check if the LAST_VALUE window function with IGNORE NULLS has 
effectively gap-filled the salary values
+    Integer lastSalary = null;
+    JsonNode rows = response.get("resultTable").get("rows");
+    for (int i = 0; i < rows.size(); i++) {
+      JsonNode row = rows.get(i);
+      if (!row.get(0).isNull()) {
+        assertEquals(row.get(0).asInt(), row.get(1).asInt());
+        lastSalary = row.get(0).asInt();
+      } else {
+        assertEquals(lastSalary, row.get(1).numberValue());
+      }
+    }
+  }
+
   @Override
   protected void overrideBrokerConf(PinotConfiguration brokerConf) {
     
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING,
 "true");
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
index 94bd38e8e7..d549ca4149 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
@@ -32,6 +32,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlLeadLagAggFunction;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
@@ -134,10 +135,6 @@ public class PinotOperatorTable implements 
SqlOperatorTable {
       SqlStdOperatorTable.MODE,
       SqlStdOperatorTable.MIN,
       SqlStdOperatorTable.MAX,
-      SqlStdOperatorTable.LAST_VALUE,
-      SqlStdOperatorTable.FIRST_VALUE,
-      SqlStdOperatorTable.LEAD,
-      SqlStdOperatorTable.LAG,
       SqlStdOperatorTable.AVG,
       SqlStdOperatorTable.STDDEV_POP,
       SqlStdOperatorTable.COVAR_POP,
@@ -152,7 +149,17 @@ public class PinotOperatorTable implements 
SqlOperatorTable {
       SqlStdOperatorTable.RANK,
       SqlStdOperatorTable.ROW_NUMBER,
 
+      // WINDOW Functions (non-aggregate)
+      SqlStdOperatorTable.LAST_VALUE,
+      SqlStdOperatorTable.FIRST_VALUE,
+      // TODO: Replace these with SqlStdOperatorTable.LEAD and 
SqlStdOperatorTable.LAG when the function implementations
+      // are updated to support the IGNORE NULLS option.
+      PinotLeadWindowFunction.INSTANCE,
+      PinotLagWindowFunction.INSTANCE,
+
       // SPECIAL OPERATORS
+      SqlStdOperatorTable.IGNORE_NULLS,
+      SqlStdOperatorTable.RESPECT_NULLS,
       SqlStdOperatorTable.BETWEEN,
       SqlStdOperatorTable.SYMMETRIC_BETWEEN,
       SqlStdOperatorTable.NOT_BETWEEN,
@@ -372,4 +379,30 @@ public class PinotOperatorTable implements 
SqlOperatorTable {
   public List<SqlOperator> getOperatorList() {
     return _operatorList;
   }
+
+  private static class PinotLeadWindowFunction extends SqlLeadLagAggFunction {
+    static final SqlOperator INSTANCE = new PinotLeadWindowFunction();
+
+    public PinotLeadWindowFunction() {
+      super(SqlKind.LEAD);
+    }
+
+    @Override
+    public boolean allowsNullTreatment() {
+      return false;
+    }
+  }
+
+  private static class PinotLagWindowFunction extends SqlLeadLagAggFunction {
+    static final SqlOperator INSTANCE = new PinotLagWindowFunction();
+
+    public PinotLagWindowFunction() {
+      super(SqlKind.LAG);
+    }
+
+    @Override
+    public boolean allowsNullTreatment() {
+      return false;
+    }
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
index 13bbd791b2..bc6ba26884 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
@@ -324,7 +324,7 @@ public final class PlanNodeToRelConverter {
           RelDataType relDataType = 
funCall.getDataType().toType(_builder.getTypeFactory());
           Window.RexWinAggCall winCall = new Window.RexWinAggCall(aggFunction, 
relDataType, operands, aggCalls.size(),
               // same as the one used in LogicalWindow.create
-              funCall.isDistinct(), false);
+              funCall.isDistinct(), funCall.isIgnoreNulls());
           aggCalls.add(winCall);
         }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 3419fdc5bb..0df030f197 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -111,17 +111,20 @@ public interface RexExpression {
     private final List<RexExpression> _functionOperands;
     // whether the function is a distinct function.
     private final boolean _isDistinct;
+    // whether the function should ignore nulls (relevant to certain window 
functions like LAST_VALUE).
+    private final boolean _ignoreNulls;
 
     public FunctionCall(ColumnDataType dataType, String functionName, 
List<RexExpression> functionOperands) {
-      this(dataType, functionName, functionOperands, false);
+      this(dataType, functionName, functionOperands, false, false);
     }
 
     public FunctionCall(ColumnDataType dataType, String functionName, 
List<RexExpression> functionOperands,
-        boolean isDistinct) {
+        boolean isDistinct, boolean ignoreNulls) {
       _dataType = dataType;
       _functionName = functionName;
       _functionOperands = functionOperands;
       _isDistinct = isDistinct;
+      _ignoreNulls = ignoreNulls;
     }
 
     public ColumnDataType getDataType() {
@@ -140,6 +143,10 @@ public interface RexExpression {
       return _isDistinct;
     }
 
+    public boolean isIgnoreNulls() {
+      return _ignoreNulls;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index b23e12f224..d9b4af5fca 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -405,12 +405,12 @@ public class RexExpressionUtils {
   public static RexExpression.FunctionCall fromAggregateCall(AggregateCall 
aggregateCall) {
     return new 
RexExpression.FunctionCall(RelToPlanNodeConverter.convertToColumnDataType(aggregateCall.type),
         getFunctionName(aggregateCall.getAggregation()), 
fromRexNodes(aggregateCall.rexList),
-        aggregateCall.isDistinct());
+        aggregateCall.isDistinct(), false);
   }
 
   public static RexExpression.FunctionCall 
fromWindowAggregateCall(Window.RexWinAggCall winAggCall) {
     return new 
RexExpression.FunctionCall(RelToPlanNodeConverter.convertToColumnDataType(winAggCall.type),
-        getFunctionName(winAggCall.op), fromRexNodes(winAggCall.operands), 
winAggCall.distinct);
+        getFunctionName(winAggCall.op), fromRexNodes(winAggCall.operands), 
winAggCall.distinct, winAggCall.ignoreNulls);
   }
 
   public static Integer getValueAsInt(@Nullable RexNode in) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java
index 0a5e9dc68c..cca846958e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoExpressionToRexExpression.java
@@ -58,7 +58,7 @@ public class ProtoExpressionToRexExpression {
       operands.add(convertExpression(protoOperand));
     }
     return new 
RexExpression.FunctionCall(convertColumnDataType(functionCall.getDataType()),
-        functionCall.getFunctionName(), operands, 
functionCall.getIsDistinct());
+        functionCall.getFunctionName(), operands, 
functionCall.getIsDistinct(), functionCall.getIgnoreNulls());
   }
 
   public static RexExpression.Literal convertLiteral(Expressions.Literal 
literal) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java
index a7304299a2..2053de0cc1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/RexExpressionToProtoExpression.java
@@ -64,9 +64,13 @@ public class RexExpressionToProtoExpression {
     for (RexExpression operand : operands) {
       protoOperands.add(convertExpression(operand));
     }
-    return 
Expressions.FunctionCall.newBuilder().setDataType(convertColumnDataType(functionCall.getDataType()))
-        
.setFunctionName(functionCall.getFunctionName()).addAllFunctionOperands(protoOperands)
-        .setIsDistinct(functionCall.isDistinct()).build();
+    return Expressions.FunctionCall.newBuilder()
+        .setDataType(convertColumnDataType(functionCall.getDataType()))
+        .setFunctionName(functionCall.getFunctionName())
+        .addAllFunctionOperands(protoOperands)
+        .setIsDistinct(functionCall.isDistinct())
+        .setIgnoreNulls(functionCall.isIgnoreNulls())
+        .build();
   }
 
   public static Expressions.Literal convertLiteral(RexExpression.Literal 
literal) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java
index 835297a1b6..2bb7d6adff 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/aggregate/AggregateWindowFunction.java
@@ -83,7 +83,7 @@ public class AggregateWindowFunction extends WindowFunction {
     List<Object> result = new ArrayList<>(numRows);
     for (int i = 0; i < numRows; i++) {
       if (lowerBound >= numRows) {
-        // Fill the remaining rows with null
+        // Fill the remaining rows with null since all subsequent windows will 
be out of bounds
         for (int j = i; j < numRows; j++) {
           result.add(null);
         }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java
index 78dc5b198f..a1f84be32e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/FirstValueWindowFunction.java
@@ -20,11 +20,12 @@ package 
org.apache.pinot.query.runtime.operator.window.value;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.collections.DualValueList;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -42,15 +43,23 @@ public class FirstValueWindowFunction extends 
ValueWindowFunction {
   @Override
   public List<Object> processRows(List<Object[]> rows) {
     if (_windowFrame.isRowType()) {
-      return processRowsWindow(rows);
+      if (_ignoreNulls) {
+        return processRowsWindowIgnoreNulls(rows);
+      } else {
+        return processRowsWindow(rows);
+      }
     } else {
-      return processRangeWindow(rows);
+      if (_ignoreNulls) {
+        return processRangeWindowIgnoreNulls(rows);
+      } else {
+        return processRangeWindow(rows);
+      }
     }
   }
 
   private List<Object> processRowsWindow(List<Object[]> rows) {
     if (_windowFrame.isUnboundedPreceding() && _windowFrame.getUpperBound() >= 
0) {
-      return processUnboundedPreceding(rows);
+      return fillAllWithValue(rows, extractValueFromRow(rows.get(0)));
     }
 
     int numRows = rows.size();
@@ -62,7 +71,7 @@ public class FirstValueWindowFunction extends 
ValueWindowFunction {
 
     for (int i = 0; i < numRows; i++) {
       if (lowerBound >= numRows) {
-        // Fill remaining rows with null
+        // Fill the remaining rows with null since all subsequent windows will 
be out of bounds
         for (int j = i; j < numRows; j++) {
           result.add(null);
         }
@@ -78,12 +87,64 @@ public class FirstValueWindowFunction extends 
ValueWindowFunction {
       lowerBound++;
       upperBound = Math.min(upperBound + 1, numRows - 1);
     }
+
+    return result;
+  }
+
+  private List<Object> processRowsWindowIgnoreNulls(List<Object[]> rows) {
+    int numRows = rows.size();
+    int lowerBound = _windowFrame.getLowerBound();
+    int upperBound = Math.min(_windowFrame.getUpperBound(), numRows - 1);
+
+    // Find first non-null value in the first window
+    int indexOfFirstNonNullValue = indexOfFirstNonNullValueInWindow(rows, 
Math.max(lowerBound, 0), upperBound);
+    List<Object> result = new ArrayList<>(numRows);
+
+    for (int i = 0; i < numRows; i++) {
+      if (lowerBound >= numRows) {
+        // Fill the remaining rows with null since all subsequent windows will 
be out of bounds
+        for (int j = i; j < numRows; j++) {
+          result.add(null);
+        }
+        break;
+      }
+
+      if (indexOfFirstNonNullValue != -1) {
+        result.add(extractValueFromRow(rows.get(indexOfFirstNonNullValue)));
+      } else {
+        result.add(null);
+      }
+
+      // Slide the window forward by one row; check if 
indexOfFirstNonNullValue is the lower bound which will not be in
+      // the next window. If so, find the next non-null value.
+      if (lowerBound >= 0 && indexOfFirstNonNullValue == lowerBound) {
+        // Find first non-null value for the next window
+        indexOfFirstNonNullValue =
+            indexOfFirstNonNullValueInWindow(rows, lowerBound + 1, 
Math.min(upperBound + 1, numRows - 1));
+      }
+      lowerBound++;
+
+      // After the lower bound is updated, we also update the upper bound for 
the next window. The upper bound is only
+      // incremented if we're not already at the row boundary. If the upper 
bound is incremented, we also need to check
+      // if the new value being added into the window is non-null if the rest 
of the window has only null values (i.e.,
+      // if indexOfFirstNonNullValue is -1).
+      if (upperBound < numRows - 1) {
+        upperBound++;
+        if (indexOfFirstNonNullValue == -1 && upperBound >= 0) {
+          Object value = extractValueFromRow(rows.get(upperBound));
+          if (value != null) {
+            indexOfFirstNonNullValue = upperBound;
+          }
+        }
+      }
+    }
+
     return result;
   }
 
   private List<Object> processRangeWindow(List<Object[]> rows) {
     if (_windowFrame.isUnboundedPreceding()) {
-      return processUnboundedPreceding(rows);
+      return fillAllWithValue(rows, extractValueFromRow(rows.get(0)));
     }
 
     // The lower bound has to be CURRENT ROW since we don't support RANGE 
windows with offset value
@@ -113,12 +174,113 @@ public class FirstValueWindowFunction extends 
ValueWindowFunction {
     return result;
   }
 
-  private List<Object> processUnboundedPreceding(List<Object[]> rows) {
+  private List<Object> processRangeWindowIgnoreNulls(List<Object[]> rows) {
     int numRows = rows.size();
-    assert numRows > 0;
-    Object value = extractValueFromRow(rows.get(0));
-    Object[] result = new Object[numRows];
-    Arrays.fill(result, value);
-    return Arrays.asList(result);
+
+    if (_windowFrame.isUnboundedPreceding() && 
_windowFrame.isUnboundedFollowing()) {
+      // Find the first non-null value and fill it in all rows
+      int indexOfFirstNonNullValue = indexOfFirstNonNullValueInWindow(rows, 0, 
numRows - 1);
+      if (indexOfFirstNonNullValue == -1) {
+        // There's no non-null value
+        return Collections.nCopies(numRows, null);
+      } else {
+        return fillAllWithValue(rows, 
extractValueFromRow(rows.get(indexOfFirstNonNullValue)));
+      }
+    }
+
+    if (_windowFrame.isUnboundedPreceding() && 
_windowFrame.isUpperBoundCurrentRow()) {
+      // Find the first non-null value and fill it in all rows starting from 
the first row of the peer group of the row
+      // with the first non-null value
+      int firstNonNullValueIndex = indexOfFirstNonNullValueInWindow(rows, 0, 
numRows - 1);
+      Key firstNonNullValueKey;
+
+      // No non-null values
+      if (firstNonNullValueIndex == -1) {
+        return Collections.nCopies(numRows, null);
+      } else {
+        firstNonNullValueKey = 
AggregationUtils.extractRowKey(rows.get(firstNonNullValueIndex), _orderKeys);
+      }
+
+      // Find the start of the peer group of the row with the first non-null 
value
+      int nullEndIndex;
+      for (nullEndIndex = 0; nullEndIndex < numRows; nullEndIndex++) {
+        Object[] row = rows.get(nullEndIndex);
+        Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys);
+        if (orderKey.equals(firstNonNullValueKey)) {
+          break;
+        }
+      }
+
+      Object firstNonNullValue = 
extractValueFromRow(rows.get(firstNonNullValueIndex));
+      return new DualValueList<>(null, nullEndIndex, firstNonNullValue, 
numRows - nullEndIndex);
+    }
+
+    if (_windowFrame.isLowerBoundCurrentRow() && 
_windowFrame.isUpperBoundCurrentRow()) {
+      List<Object> result = new ArrayList<>(numRows);
+      Map<Key, Object> firstValueForKey = new HashMap<>();
+
+      for (Object[] row : rows) {
+        Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys);
+        Object value = extractValueFromRow(row);
+
+        if (value != null) {
+          firstValueForKey.putIfAbsent(orderKey, value);
+        }
+      }
+
+      for (Object[] row : rows) {
+        result.add(firstValueForKey.get(AggregationUtils.extractRowKey(row, 
_orderKeys)));
+      }
+
+      return result;
+    }
+
+    if (_windowFrame.isLowerBoundCurrentRow() && 
_windowFrame.isUnboundedFollowing()) {
+      List<Object> result = new ArrayList<>(numRows);
+      Map<Key, Object> firstValueForKey = new HashMap<>();
+
+      for (Object[] row : rows) {
+        Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys);
+        Object value = extractValueFromRow(row);
+
+        if (value != null) {
+          firstValueForKey.putIfAbsent(orderKey, value);
+        }
+      }
+
+      // Do a reverse iteration to get the first non-null value for each 
group. The first non-null value could either
+      // belong to the current group or any of the next groups if all the 
values in the current group are null.
+      Object prevNonNullValue = null;
+      for (int i = numRows - 1; i >= 0; i--) {
+        Object[] row = rows.get(i);
+        Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys);
+        Object value = firstValueForKey.get(orderKey);
+
+        if (value != null) {
+          prevNonNullValue = value;
+        }
+
+        result.add(prevNonNullValue);
+      }
+
+      Collections.reverse(result);
+      return result;
+    }
+
+    throw new IllegalStateException("RANGE window frame with offset PRECEDING 
/ FOLLOWING is not supported");
+  }
+
+  /**
+   * Both lowerBound and upperBound should be valid values for the given row 
set. The returned value is -1 if there is
+   * no non-null value in the window.
+   */
+  private int indexOfFirstNonNullValueInWindow(List<Object[]> rows, int 
lowerBound, int upperBound) {
+    for (int i = lowerBound; i <= upperBound; i++) {
+      Object value = extractValueFromRow(rows.get(i));
+      if (value != null) {
+        return i;
+      }
+    }
+    return -1;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java
index 9c70722f8e..5383525d2d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LastValueWindowFunction.java
@@ -20,12 +20,12 @@ package 
org.apache.pinot.query.runtime.operator.window.value;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.collections.DualValueList;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -43,18 +43,26 @@ public class LastValueWindowFunction extends 
ValueWindowFunction {
   @Override
   public List<Object> processRows(List<Object[]> rows) {
     if (_windowFrame.isRowType()) {
-      return processRowsWindow(rows);
+      if (_ignoreNulls) {
+        return processRowsWindowIgnoreNulls(rows);
+      } else {
+        return processRowsWindow(rows);
+      }
     } else {
-      return processRangeWindow(rows);
+      if (_ignoreNulls) {
+        return processRangeWindowIgnoreNulls(rows);
+      } else {
+        return processRangeWindow(rows);
+      }
     }
   }
 
   private List<Object> processRowsWindow(List<Object[]> rows) {
+    int numRows = rows.size();
     if (_windowFrame.isUnboundedFollowing() && _windowFrame.getLowerBound() <= 
0) {
-      return processUnboundedFollowing(rows);
+      return fillAllWithValue(rows, extractValueFromRow(rows.get(numRows - 
1)));
     }
 
-    int numRows = rows.size();
     List<Object> result = new ArrayList<>(numRows);
 
     // lowerBound is guaranteed to be less than or equal to upperBound here 
(but both can be -ve / +ve)
@@ -63,7 +71,7 @@ public class LastValueWindowFunction extends 
ValueWindowFunction {
 
     for (int i = 0; i < numRows; i++) {
       if (lowerBound >= numRows) {
-        // Fill remaining rows with null
+        // Fill the remaining rows with null since all subsequent windows will 
be out of bounds
         for (int j = i; j < numRows; j++) {
           result.add(null);
         }
@@ -83,16 +91,64 @@ public class LastValueWindowFunction extends 
ValueWindowFunction {
     return result;
   }
 
+  private List<Object> processRowsWindowIgnoreNulls(List<Object[]> rows) {
+    int numRows = rows.size();
+    int lowerBound = _windowFrame.getLowerBound();
+    int upperBound = Math.min(_windowFrame.getUpperBound(), numRows - 1);
+
+    // Find last non-null value in the first window
+    int indexOfLastNonNullValue = indexOfLastNonNullValueInWindow(rows, 
Math.max(0, lowerBound), upperBound);
+
+    List<Object> result = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; i++) {
+      if (lowerBound >= numRows) {
+        // Fill the remaining rows with null since all subsequent windows will 
be out of bounds
+        for (int j = i; j < numRows; j++) {
+          result.add(null);
+        }
+        break;
+      }
+
+      if (indexOfLastNonNullValue != -1) {
+        result.add(extractValueFromRow(rows.get(indexOfLastNonNullValue)));
+      } else {
+        result.add(null);
+      }
+
+      // Slide the window forward by one row; check if indexOfLastNonNullValue 
is the lower bound which will not be in
+      // the next window. If so, update it to -1 since the window no longer 
has any non-null values.
+      if (indexOfLastNonNullValue == lowerBound) {
+        indexOfLastNonNullValue = -1;
+      }
+      lowerBound++;
+
+      // After the lower bound is updated, we also update the upper bound for 
the next window. The upper bound is only
+      // incremented if we're not already at the row boundary. If the upper 
bound is incremented, we also need to update
+      // indexOfLastNonNullValue to the new upper bound if it contains a 
non-null value.
+      if (upperBound < numRows - 1) {
+        upperBound++;
+        if (upperBound >= 0) {
+          Object value = extractValueFromRow(rows.get(upperBound));
+          if (value != null) {
+            indexOfLastNonNullValue = upperBound;
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
   private List<Object> processRangeWindow(List<Object[]> rows) {
+    int numRows = rows.size();
     if (_windowFrame.isUnboundedFollowing()) {
-      return processUnboundedFollowing(rows);
+      return fillAllWithValue(rows, extractValueFromRow(rows.get(numRows - 
1)));
     }
 
     // The upper bound has to be CURRENT ROW here since we don't support RANGE 
windows with offset value
     Preconditions.checkState(_windowFrame.isUpperBoundCurrentRow(),
         "RANGE window frame with offset PRECEDING / FOLLOWING is not 
supported");
 
-    int numRows = rows.size();
     List<Object> result = new ArrayList<>(numRows);
     Map<Key, Object> lastValueForKey = new HashMap<>();
 
@@ -117,12 +173,103 @@ public class LastValueWindowFunction extends 
ValueWindowFunction {
     return result;
   }
 
-  private List<Object> processUnboundedFollowing(List<Object[]> rows) {
+  private List<Object> processRangeWindowIgnoreNulls(List<Object[]> rows) {
     int numRows = rows.size();
-    assert numRows > 0;
-    Object value = extractValueFromRow(rows.get(numRows - 1));
-    Object[] result = new Object[numRows];
-    Arrays.fill(result, value);
-    return Arrays.asList(result);
+
+    if (_windowFrame.isUnboundedPreceding() && 
_windowFrame.isUnboundedFollowing()) {
+      // Find the last non-null value and fill it in all rows
+      int indexOfLastNonNullValue = indexOfLastNonNullValueInWindow(rows, 0, 
numRows - 1);
+      if (indexOfLastNonNullValue == -1) {
+        // There's no non-null value
+        return Collections.nCopies(numRows, null);
+      } else {
+        return fillAllWithValue(rows, 
extractValueFromRow(rows.get(indexOfLastNonNullValue)));
+      }
+    }
+
+    if (_windowFrame.isUnboundedPreceding() && 
_windowFrame.isUpperBoundCurrentRow()) {
+      List<Object> result = new ArrayList<>(numRows);
+      Map<Key, Object> lastValueForKey = new HashMap<>();
+      Object lastNonNullValue = null;
+
+      for (Object[] row : rows) {
+        Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys);
+        Object value = extractValueFromRow(row);
+
+        if (value != null) {
+          lastValueForKey.put(orderKey, value);
+          lastNonNullValue = value;
+        } else {
+          lastValueForKey.putIfAbsent(orderKey, lastNonNullValue);
+        }
+      }
+
+      for (Object[] row : rows) {
+        result.add(lastValueForKey.get(AggregationUtils.extractRowKey(row, 
_orderKeys)));
+      }
+
+      return result;
+    }
+
+    if (_windowFrame.isLowerBoundCurrentRow() && 
_windowFrame.isUpperBoundCurrentRow()) {
+      List<Object> result = new ArrayList<>(numRows);
+      Map<Key, Object> lastValueForKey = new HashMap<>();
+
+      for (Object[] row : rows) {
+        Key orderKey = AggregationUtils.extractRowKey(row, _orderKeys);
+        Object value = extractValueFromRow(row);
+
+        if (value != null) {
+          lastValueForKey.put(orderKey, value);
+        }
+      }
+
+      for (Object[] row : rows) {
+        result.add(lastValueForKey.get(AggregationUtils.extractRowKey(row, 
_orderKeys)));
+      }
+
+      return result;
+    }
+
+    if (_windowFrame.isLowerBoundCurrentRow() && 
_windowFrame.isUnboundedFollowing()) {
+      // Get last non-null value and fill it in all rows from the first row 
till the last row of the peer group of the
+      // row with the non-null value
+      int indexOfLastNonNullValue = indexOfLastNonNullValueInWindow(rows, 0, 
numRows - 1);
+      Key lastNonNullValueKey;
+
+      // No non-null values
+      if (indexOfLastNonNullValue == -1) {
+        return Collections.nCopies(numRows, null);
+      } else {
+        lastNonNullValueKey = 
AggregationUtils.extractRowKey(rows.get(indexOfLastNonNullValue), _orderKeys);
+      }
+
+      // Find the end of the peer group of the last row with the non-null value
+      int fillBoundary;
+      for (fillBoundary = indexOfLastNonNullValue + 1; fillBoundary < numRows; 
fillBoundary++) {
+        if (!AggregationUtils.extractRowKey(rows.get(fillBoundary), 
_orderKeys).equals(lastNonNullValueKey)) {
+          break;
+        }
+      }
+
+      Object lastNonNullValue = 
extractValueFromRow(rows.get(indexOfLastNonNullValue));
+      return new DualValueList<>(lastNonNullValue, fillBoundary, null, numRows 
- fillBoundary);
+    }
+
+    throw new IllegalStateException("RANGE window frame with offset PRECEDING 
/ FOLLOWING is not supported");
+  }
+
+  /**
+   * Both lowerBound and upperBound should be valid values for the given row 
set. The returned value is -1 if there is
+   * no non-null value in the window.
+   */
+  private int indexOfLastNonNullValueInWindow(List<Object[]> rows, int 
lowerBound, int upperBound) {
+    for (int i = upperBound; i >= lowerBound; i--) {
+      Object value = extractValueFromRow(rows.get(i));
+      if (value != null) {
+        return i;
+      }
+    }
+    return -1;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
index ec9fed8ace..ed3adf03c7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
@@ -46,7 +46,7 @@ public class LeadValueWindowFunction extends 
ValueWindowFunction {
     if (numOperands > 1) {
       RexExpression secondOperand = operands.get(1);
       Preconditions.checkArgument(secondOperand instanceof 
RexExpression.Literal,
-          "Second operand (offset) of LAG function must be a literal");
+          "Second operand (offset) of LEAD function must be a literal");
       Object offsetValue = ((RexExpression.Literal) secondOperand).getValue();
       if (offsetValue instanceof Number) {
         offset = ((Number) offsetValue).intValue();
@@ -55,7 +55,7 @@ public class LeadValueWindowFunction extends 
ValueWindowFunction {
     if (numOperands == 3) {
       RexExpression thirdOperand = operands.get(2);
       Preconditions.checkArgument(thirdOperand instanceof 
RexExpression.Literal,
-          "Third operand (default value) of LAG function must be a literal");
+          "Third operand (default value) of LEAD function must be a literal");
       RexExpression.Literal defaultValueLiteral = (RexExpression.Literal) 
thirdOperand;
       defaultValue = defaultValueLiteral.getValue();
       if (defaultValue != null) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java
index 33ac71c815..ce8a00d432 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/ValueWindowFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator.window.value;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -40,8 +41,17 @@ public abstract class ValueWindowFunction extends 
WindowFunction {
           .build();
   //@formatter:on
 
+  protected final boolean _ignoreNulls;
+
   public ValueWindowFunction(RexExpression.FunctionCall aggCall, DataSchema 
inputSchema,
       List<RelFieldCollation> collations, WindowFrame windowFrame) {
     super(aggCall, inputSchema, collations, windowFrame);
+    _ignoreNulls = aggCall.isIgnoreNulls();
+  }
+
+  protected List<Object> fillAllWithValue(List<Object[]> rows, Object value) {
+    int numRows = rows.size();
+    assert numRows > 0;
+    return Collections.nCopies(numRows, value);
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index a67726cd90..abb38b8d7d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -533,7 +533,6 @@ public class WindowAggregateOperatorTest {
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, keys, Map.of(
         1, List.of(
             new Object[]{1, "foo", 1, null},
@@ -548,7 +547,6 @@ public class WindowAggregateOperatorTest {
             new Object[]{3, "and", 3, null},
             new Object[]{3, "true", null, 3})
     ));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
@@ -581,7 +579,6 @@ public class WindowAggregateOperatorTest {
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, keys, Map.of(
         1, List.of(
             new Object[]{1, "foo", 1, 200},
@@ -596,14 +593,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{3, "and", 100, 200},
             new Object[]{3, "true", 100, 3})
     ));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testSumWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType
 frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, 
frameType, Integer.MIN_VALUE, Integer.MAX_VALUE,
         getSum(new RexExpression.InputRef(1)),
@@ -615,13 +610,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then (result should be the same for both window frame types):
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 59.0},
@@ -632,15 +625,13 @@ public class WindowAggregateOperatorTest {
         "B", List.of(
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 30.0}
-    )));
-    //@formatter:on
+        )));
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testSumWithUnboundedPrecedingLowerAndCurrentRowUpper(WindowNode.WindowFrameType 
frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, 
frameType, Integer.MIN_VALUE, 0,
         getSum(new RexExpression.InputRef(1)),
@@ -652,13 +643,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14.0},
@@ -670,14 +659,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithUnboundedPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, 2,
         getSum(new RexExpression.InputRef(1)),
@@ -689,13 +676,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 44.0},
@@ -707,14 +692,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, -2,
         getSum(new RexExpression.InputRef(1)),
@@ -726,13 +709,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -744,14 +725,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testSumWithCurrentRowLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType 
frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, 
frameType, 0, Integer.MAX_VALUE,
         getSum(new RexExpression.InputRef(1)),
@@ -763,13 +742,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 59.0},
@@ -781,14 +758,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 20.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testSumWithCurrentRowLowerAndCurrentRowUpper(WindowNode.WindowFrameType 
frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, 
frameType, 0, 0,
         getSum(new RexExpression.InputRef(1)),
@@ -800,13 +775,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14.0},
@@ -818,14 +791,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10.0},
             new Object[]{"B", 20, 2005, 20.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithCurrentRowLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
0, 2,
         getSum(new RexExpression.InputRef(1)),
@@ -837,13 +808,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 44.0},
@@ -855,14 +824,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 20.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithOffsetPrecedingLowerAndUnboundedFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
-1, Integer.MAX_VALUE,
         getSum(new RexExpression.InputRef(1)),
@@ -874,13 +841,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 59.0},
@@ -892,14 +857,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithOffsetFollowingLowerAndUnboundedFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
1, Integer.MAX_VALUE,
         getSum(new RexExpression.InputRef(1)),
@@ -911,13 +874,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 45.0},
@@ -929,14 +890,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20.0},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithOffsetPrecedingLowerAndCurrentRowUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
-2, 0,
         getSum(new RexExpression.InputRef(1)),
@@ -948,13 +907,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14.0},
@@ -966,14 +923,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithOffsetPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
-1, 2,
         getSum(new RexExpression.InputRef(1)),
@@ -985,13 +940,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 44.0},
@@ -1003,14 +956,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithVeryLargeOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         // Verify if overflows are handled correctly
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
-1, 2147483646,
@@ -1023,13 +974,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 59.0},
@@ -1041,14 +990,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithVeryLargeOffsetFollowingLower() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         // Verify if overflows are handled correctly
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
2147483646, 2147483647,
@@ -1061,13 +1008,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -1079,14 +1024,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithOffsetPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
-3, -2,
         getSum(new RexExpression.InputRef(1)),
@@ -1098,13 +1041,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -1116,14 +1057,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithOffsetFollowingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 2, ROWS, 
1, 2,
         getSum(new RexExpression.InputRef(1)),
@@ -1135,13 +1074,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 30.0},
@@ -1153,14 +1090,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20.0},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testSumWithSamePartitionAndCollationKey() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, DOUBLE, List.of(0), 0, RANGE, 
Integer.MIN_VALUE, 0,
         getSum(new RexExpression.InputRef(1)),
@@ -1172,13 +1107,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 59.0},
@@ -1190,14 +1123,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 30.0},
             new Object[]{"B", 20, 2005, 30.0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testMinWithRowsWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 
1,
         getMin(new RexExpression.InputRef(1)),
@@ -1209,13 +1140,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 10},
@@ -1227,14 +1156,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testMinWithRangeWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, RANGE, 0, 
Integer.MAX_VALUE,
         getMin(new RexExpression.InputRef(1)),
@@ -1247,13 +1174,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", null, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 10},
@@ -1266,14 +1191,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", null, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testMaxWithRowsWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 2,
         getMax(new RexExpression.InputRef(1)),
@@ -1285,13 +1208,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 20, 2000, 20},
@@ -1303,14 +1224,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testMaxWithRangeWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, RANGE, 0, 
0,
         getMax(new RexExpression.InputRef(1)),
@@ -1323,13 +1242,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 20, 2000},
             new Object[]{"B", null, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1342,14 +1259,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 20, 2000, 20},
             new Object[]{"B", null, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testBoolAndWithRowsWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, 
ROWS, -2, -1,
         getBoolAnd(new RexExpression.InputRef(1)),
@@ -1362,13 +1277,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000},
             new Object[]{"B", 0, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 0, 2000, null},
@@ -1381,14 +1294,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000, null},
             new Object[]{"B", 0, 2005, 1}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testBoolAndWithRangeWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, 
RANGE, Integer.MIN_VALUE, 0,
         getBoolAnd(new RexExpression.InputRef(1)),
@@ -1401,13 +1312,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000},
             new Object[]{"B", 0, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 0, 2000, 0},
@@ -1420,14 +1329,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000, 1},
             new Object[]{"B", 0, 2005, 0}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testBoolOrWithRowsWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, 
ROWS, 1, 2,
         getBoolOr(new RexExpression.InputRef(1)),
@@ -1440,13 +1347,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000},
             new Object[]{"B", 0, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 0, 2000, 1},
@@ -1459,14 +1364,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000, 0},
             new Object[]{"B", 0, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testBoolOrWithRangeWindow() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, BOOLEAN, INT}, BOOLEAN, List.of(0), 2, 
RANGE, Integer.MIN_VALUE, Integer.MAX_VALUE,
         getBoolOr(new RexExpression.InputRef(1)),
@@ -1479,13 +1382,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000},
             new Object[]{"B", 0, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 0, 2000, 1},
@@ -1498,14 +1399,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 1, 2000, 1},
             new Object[]{"B", 0, 2005, 1}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testFirstValueWithUnboundedPrecedingLowerAndCurrentRowUpper(WindowNode.WindowFrameType
 frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, 0,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1517,13 +1416,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1535,7 +1432,6 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
@@ -1543,7 +1439,6 @@ public class WindowAggregateOperatorTest {
   public void 
testFirstValueWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(
       WindowNode.WindowFrameType frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1555,13 +1450,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1573,14 +1466,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void 
testFirstValueWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, -2,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1592,13 +1483,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -1610,14 +1499,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void 
testFirstValueWithUnboundedPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, 2,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1629,13 +1516,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1647,14 +1532,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testFirstValueWithCurrentRowLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType
 frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, Integer.MAX_VALUE,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1666,13 +1549,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1684,14 +1565,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testFirstValueWithCurrentRowLowerAndCurrentRowUpper(WindowNode.WindowFrameType 
frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, 0,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1703,13 +1582,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1721,14 +1598,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testFirstValueWithCurrentRowLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 2,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1740,13 +1615,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1758,14 +1631,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testFirstValueWithOffsetPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 
2,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1777,13 +1648,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1795,14 +1664,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testFirstValueWithOffsetPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, 
-1,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1814,13 +1681,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -1832,14 +1697,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testFirstValueWithOffsetFollowingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 2, 3,
         getFirstValue(new RexExpression.InputRef(1)),
@@ -1851,13 +1714,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 20},
@@ -1869,14 +1730,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testLastValueWithUnboundedPrecedingLowerAndCurrentRowUpper(WindowNode.WindowFrameType
 frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, 0,
         getLastValue(new RexExpression.InputRef(1)),
@@ -1888,13 +1747,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -1906,7 +1763,6 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
@@ -1914,7 +1770,6 @@ public class WindowAggregateOperatorTest {
   public void 
testLastValueWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(
       WindowNode.WindowFrameType frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
         getLastValue(new RexExpression.InputRef(1)),
@@ -1926,13 +1781,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 15},
@@ -1944,14 +1797,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void 
testLastValueWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, -2,
         getLastValue(new RexExpression.InputRef(1)),
@@ -1963,13 +1814,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -1981,14 +1830,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void 
testLastValueWithUnboundedPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, 2,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2000,13 +1847,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 20},
@@ -2018,14 +1863,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testLastValueWithCurrentRowLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType
 frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, Integer.MAX_VALUE,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2037,13 +1880,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 15},
@@ -2055,14 +1896,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test(dataProvider = "windowFrameTypes")
   public void 
testLastValueWithCurrentRowLowerAndCurrentRowUpper(WindowNode.WindowFrameType 
frameType) {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, 0,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2074,13 +1913,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 14},
@@ -2092,14 +1929,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 10},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testLastValueWithCurrentRowLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 2,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2111,13 +1946,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 20},
@@ -2129,14 +1962,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testLastValueWithOffsetPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 
2,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2148,13 +1979,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 20},
@@ -2166,14 +1995,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, 20}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testLastValueWithOffsetPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, 
-1,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2185,13 +2012,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, null},
@@ -2203,14 +2028,12 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, null},
             new Object[]{"B", 20, 2005, 10}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
   public void testLastValueWithOffsetFollowingLowerAndOffsetFollowingUpper() {
     // Given:
-    //@formatter:off
     WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
         new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 1, 3,
         getLastValue(new RexExpression.InputRef(1)),
@@ -2222,13 +2045,11 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000},
             new Object[]{"B", 20, 2005}
         });
-    //@formatter:on
 
     // When:
     List<Object[]> resultRows = operator.nextBlock().getContainer();
 
     // Then:
-    //@formatter:off
     verifyResultRows(resultRows, List.of(0), Map.of(
         "A", List.of(
             new Object[]{"A", 14, 2000, 15},
@@ -2240,119 +2061,820 @@ public class WindowAggregateOperatorTest {
             new Object[]{"B", 10, 2000, 20},
             new Object[]{"B", 20, 2005, null}
         )));
-    //@formatter:on
     assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
-  private WindowAggregateOperator prepareDataForWindowFunction(String[] 
inputSchemaCols,
-      ColumnDataType[] inputSchemaColTypes, ColumnDataType outputType, 
List<Integer> partitionKeys,
-      int collationFieldIndex, WindowNode.WindowFrameType frameType, int 
windowFrameLowerBound,
-      int windowFrameUpperBound, RexExpression.FunctionCall functionCall, 
Object[][] rows) {
-    DataSchema inputSchema = new DataSchema(inputSchemaCols, 
inputSchemaColTypes);
-    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
rows))
-        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+  @Test(dataProvider = "windowFrameTypes")
+  public void 
testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndCurrentRowUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, 0,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", 20, 2005}
+        });
 
-    String[] outputSchemaCols = new String[inputSchemaCols.length + 1];
-    System.arraycopy(inputSchemaCols, 0, outputSchemaCols, 0, 
inputSchemaCols.length);
-    outputSchemaCols[inputSchemaCols.length] = 
functionCall.getFunctionName().toLowerCase();
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
-    ColumnDataType[] outputSchemaColTypes = new 
ColumnDataType[inputSchemaColTypes.length + 1];
-    System.arraycopy(inputSchemaColTypes, 0, outputSchemaColTypes, 0, 
inputSchemaColTypes.length);
-    outputSchemaColTypes[inputSchemaCols.length] = outputType;
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, frameType == ROWS ? null : 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 10},
+            new Object[]{"A", null, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", 20, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
 
-    DataSchema resultSchema = new DataSchema(outputSchemaCols, 
outputSchemaColTypes);
-    List<RexExpression.FunctionCall> aggCalls = List.of(functionCall);
-    List<RelFieldCollation> collations = List.of(new 
RelFieldCollation(collationFieldIndex));
-    return getOperator(inputSchema, resultSchema, partitionKeys, collations, 
aggCalls, frameType, windowFrameLowerBound,
-        windowFrameUpperBound);
+  @Test(dataProvider = "windowFrameTypes")
+  public void 
testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", 20, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, 10},
+            new Object[]{"A", null, 2002, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 10},
+            new Object[]{"A", null, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", 20, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
-  public void testShouldThrowOnWindowFrameWithInvalidOffsetBounds() {
+  public void 
testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() {
     // Given:
-    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
-    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{2, "foo"}))
-        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
-    DataSchema resultSchema =
-        new DataSchema(new String[]{"group", "arg", "sum"}, new 
ColumnDataType[]{INT, STRING, DOUBLE});
-    List<Integer> keys = List.of(0);
-    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, -1,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
 
-    // Then:
-    IllegalStateException e = Assert.expectThrows(IllegalStateException.class,
-        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, ROWS, 5, 2));
-    assertEquals(e.getMessage(), "Window frame lower bound can't be greater 
than upper bound");
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
-    e = Assert.expectThrows(IllegalStateException.class,
-        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, ROWS, -2, -3));
-    assertEquals(e.getMessage(), "Window frame lower bound can't be greater 
than upper bound");
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, null},
+            new Object[]{"A", 10, 2002, null},
+            new Object[]{"A", 20, 2008, 10},
+            new Object[]{"A", null, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, null},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
   @Test
-  public void testShouldThrowOnWindowFrameWithOffsetBoundsForRange() {
-    // TODO: Remove this test when support for RANGE window frames with offset 
PRECEDING / FOLLOWING is added
+  public void 
testFirstValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetFollowingUpper() {
     // Given:
-    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
-    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{2, "foo"}))
-        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
-    DataSchema resultSchema =
-        new DataSchema(new String[]{"group", "arg", "sum"}, new 
ColumnDataType[]{INT, STRING, DOUBLE});
-    List<Integer> keys = List.of(0);
-    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, 1,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
 
-    // Then:
-    IllegalStateException e = Assert.expectThrows(IllegalStateException.class,
-        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, WindowNode.WindowFrameType.RANGE, 5,
-            Integer.MAX_VALUE));
-    assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / 
FOLLOWING is not supported");
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
-    e = Assert.expectThrows(IllegalStateException.class,
-        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, WindowNode.WindowFrameType.RANGE,
-            Integer.MAX_VALUE, 5));
-    assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / 
FOLLOWING is not supported");
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 10},
+            new Object[]{"A", null, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
-  private WindowAggregateOperator getOperator(DataSchema inputSchema, 
DataSchema resultSchema, List<Integer> keys,
-      List<RelFieldCollation> collations, List<RexExpression.FunctionCall> 
aggCalls,
-      WindowNode.WindowFrameType windowFrameType, int lowerBound, int 
upperBound, PlanNode.NodeHint nodeHint) {
-    return new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, inputSchema,
-        new WindowNode(-1, resultSchema, nodeHint, List.of(), keys, 
collations, aggCalls, windowFrameType, lowerBound,
-            upperBound, List.of()));
-  }
+  @Test(dataProvider = "windowFrameTypes")
+  public void 
testFirstValueIgnoreNullsWithCurrentRowLowerAndUnboundedFollowingUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, Integer.MAX_VALUE,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
 
-  private WindowAggregateOperator getOperator(DataSchema inputSchema, 
DataSchema resultSchema, List<Integer> keys,
-      List<RelFieldCollation> collations, List<RexExpression.FunctionCall> 
aggCalls,
-      WindowNode.WindowFrameType windowFrameType, int lowerBound, int 
upperBound) {
-    return getOperator(inputSchema, resultSchema, keys, collations, aggCalls, 
windowFrameType, lowerBound, upperBound,
-        PlanNode.NodeHint.EMPTY);
-  }
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
-  private static RexExpression.FunctionCall getSum(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.SUM.name(), List.of(arg));
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, 10},
+            new Object[]{"A", null, 2002, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 20},
+            new Object[]{"A", null, 2008, frameType == ROWS ? null : 20}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
-  private static RexExpression.FunctionCall getMin(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.MIN.name(), List.of(arg));
-  }
+  @Test(dataProvider = "windowFrameTypes")
+  public void testFirstValueIgnoreNullsWithCurrentRowLowerAndCurrentRowUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, 0,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
 
-  private static RexExpression.FunctionCall getMax(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.MAX.name(), List.of(arg));
-  }
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
 
-  private static RexExpression.FunctionCall getBoolAnd(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLAND", 
List.of(arg));
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, frameType == ROWS ? null : 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 20},
+            new Object[]{"A", null, 2008, frameType == ROWS ? null : 20}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
-  private static RexExpression.FunctionCall getBoolOr(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLOR", 
List.of(arg));
-  }
+  @Test
+  public void 
testFirstValueIgnoreNullsWithCurrentRowLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 1,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
 
-  private static RexExpression.FunctionCall getFirstValue(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.FIRST_VALUE.name(), List.of(arg));
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 20},
+            new Object[]{"A", null, 2008, null}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
   }
 
-  private static RexExpression.FunctionCall getLastValue(RexExpression arg) {
-    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.LAST_VALUE.name(), List.of(arg));
+  @Test
+  public void 
testFirstValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 
1,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", 20, 2008, 10},
+            new Object[]{"A", null, 2008, 20}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testFirstValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetPrecedingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, 
-1,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, null},
+            new Object[]{"A", null, 2002, null},
+            new Object[]{"A", 10, 2002, null},
+            new Object[]{"A", 20, 2008, 10},
+            new Object[]{"A", null, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, null},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testFirstValueIgnoreNullsWithOffsetFollowingLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 1, 3,
+        getFirstValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", null, 2000},
+            new Object[]{"A", null, 2002},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 20, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", null, 2000, 10},
+            new Object[]{"A", null, 2002, 10},
+            new Object[]{"A", 10, 2002, 20},
+            new Object[]{"A", 20, 2008, null},
+            new Object[]{"A", null, 2008, null}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, null},
+            new Object[]{"B", null, 2005, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test(dataProvider = "windowFrameTypes")
+  public void 
testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndCurrentRowUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, 0,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 14},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", null, 2008, frameType == ROWS ? 10 : 15},
+            new Object[]{"A", 15, 2008, 15}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test(dataProvider = "windowFrameTypes")
+  public void 
testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
Integer.MIN_VALUE, Integer.MAX_VALUE,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 15},
+            new Object[]{"A", 10, 2002, 15},
+            new Object[]{"A", 15, 2008, 15},
+            new Object[]{"A", null, 2008, 15}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetPrecedingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, -1,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, null},
+            new Object[]{"A", 10, 2002, 14},
+            new Object[]{"A", null, 2008, 10},
+            new Object[]{"A", 15, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, null},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testLastValueIgnoreNullsWithUnboundedPrecedingLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 
Integer.MIN_VALUE, 2,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 10},
+            new Object[]{"A", 10, 2002, 15},
+            new Object[]{"A", null, 2008, 15},
+            new Object[]{"A", 15, 2008, 15}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test(dataProvider = "windowFrameTypes")
+  public void 
testLastValueIgnoreNullsWithCurrentRowLowerAndUnboundedFollowingUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, Integer.MAX_VALUE,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", null, 2010},
+            new Object[]{"B", null, 2000},
+            new Object[]{"B", 10, 2005}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 15},
+            new Object[]{"A", 10, 2002, 15},
+            new Object[]{"A", 15, 2008, 15},
+            new Object[]{"A", null, 2008, frameType == ROWS ? null : 15},
+            new Object[]{"A", null, 2010, null}
+        ),
+        "B", List.of(
+            new Object[]{"B", null, 2000, 10},
+            new Object[]{"B", 10, 2005, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test(dataProvider = "windowFrameTypes")
+  public void testLastValueIgnoreNullsWithCurrentRowLowerAndCurrentRowUpper(
+      WindowNode.WindowFrameType frameType) {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, frameType, 
0, 0,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2000},
+            new Object[]{"B", null, 2008}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 14},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", null, 2008, frameType == ROWS ? null : 15},
+            new Object[]{"A", 15, 2008, 15}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2000, frameType == ROWS ? null : 10},
+            new Object[]{"B", null, 2008, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testLastValueIgnoreNullsWithCurrentRowLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 0, 1,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2008}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", null, 2008, 15},
+            new Object[]{"A", 15, 2008, 15}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2008, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testLastValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -1, 
1,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2008}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 10},
+            new Object[]{"A", 10, 2002, 10},
+            new Object[]{"A", null, 2008, 15},
+            new Object[]{"A", 15, 2008, 15}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, 10},
+            new Object[]{"B", null, 2008, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testLastValueIgnoreNullsWithOffsetPrecedingLowerAndOffsetPrecedingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, -2, 
-1,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2008}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, null},
+            new Object[]{"A", 10, 2002, 14},
+            new Object[]{"A", null, 2008, 10},
+            new Object[]{"A", 15, 2008, 10}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, null},
+            new Object[]{"B", null, 2008, 10}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  @Test
+  public void 
testLastValueIgnoreNullsWithOffsetFollowingLowerAndOffsetFollowingUpper() {
+    // Given:
+    WindowAggregateOperator operator = prepareDataForWindowFunction(new 
String[]{"name", "value", "year"},
+        new ColumnDataType[]{STRING, INT, INT}, INT, List.of(0), 2, ROWS, 1, 2,
+        getLastValue(new RexExpression.InputRef(1), true),
+        new Object[][]{
+            new Object[]{"A", 14, 2000},
+            new Object[]{"A", 10, 2002},
+            new Object[]{"A", null, 2008},
+            new Object[]{"A", 15, 2008},
+            new Object[]{"B", 10, 2000},
+            new Object[]{"B", null, 2008}
+        });
+
+    // When:
+    List<Object[]> resultRows = operator.nextBlock().getContainer();
+
+    // Then:
+    verifyResultRows(resultRows, List.of(0), Map.of(
+        "A", List.of(
+            new Object[]{"A", 14, 2000, 10},
+            new Object[]{"A", 10, 2002, 15},
+            new Object[]{"A", null, 2008, 15},
+            new Object[]{"A", 15, 2008, null}
+        ),
+        "B", List.of(
+            new Object[]{"B", 10, 2000, null},
+            new Object[]{"B", null, 2008, null}
+        )));
+    assertTrue(operator.nextBlock().isSuccessfulEndOfStreamBlock(), "Second 
block is EOS (done processing)");
+  }
+
+  private WindowAggregateOperator prepareDataForWindowFunction(String[] 
inputSchemaCols,
+      ColumnDataType[] inputSchemaColTypes, ColumnDataType outputType, 
List<Integer> partitionKeys,
+      int collationFieldIndex, WindowNode.WindowFrameType frameType, int 
windowFrameLowerBound,
+      int windowFrameUpperBound, RexExpression.FunctionCall functionCall, 
Object[][] rows) {
+    DataSchema inputSchema = new DataSchema(inputSchemaCols, 
inputSchemaColTypes);
+    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
rows))
+        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+
+    String[] outputSchemaCols = new String[inputSchemaCols.length + 1];
+    System.arraycopy(inputSchemaCols, 0, outputSchemaCols, 0, 
inputSchemaCols.length);
+    outputSchemaCols[inputSchemaCols.length] = 
functionCall.getFunctionName().toLowerCase();
+
+    ColumnDataType[] outputSchemaColTypes = new 
ColumnDataType[inputSchemaColTypes.length + 1];
+    System.arraycopy(inputSchemaColTypes, 0, outputSchemaColTypes, 0, 
inputSchemaColTypes.length);
+    outputSchemaColTypes[inputSchemaCols.length] = outputType;
+
+    DataSchema resultSchema = new DataSchema(outputSchemaCols, 
outputSchemaColTypes);
+    List<RexExpression.FunctionCall> aggCalls = List.of(functionCall);
+    List<RelFieldCollation> collations = List.of(new 
RelFieldCollation(collationFieldIndex));
+    return getOperator(inputSchema, resultSchema, partitionKeys, collations, 
aggCalls, frameType, windowFrameLowerBound,
+        windowFrameUpperBound);
+  }
+
+  @Test
+  public void testShouldThrowOnWindowFrameWithInvalidOffsetBounds() {
+    // Given:
+    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
+    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{2, "foo"}))
+        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+    DataSchema resultSchema =
+        new DataSchema(new String[]{"group", "arg", "sum"}, new 
ColumnDataType[]{INT, STRING, DOUBLE});
+    List<Integer> keys = List.of(0);
+    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+
+    // Then:
+    IllegalStateException e = Assert.expectThrows(IllegalStateException.class,
+        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, ROWS, 5, 2));
+    assertEquals(e.getMessage(), "Window frame lower bound can't be greater 
than upper bound");
+
+    e = Assert.expectThrows(IllegalStateException.class,
+        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, ROWS, -2, -3));
+    assertEquals(e.getMessage(), "Window frame lower bound can't be greater 
than upper bound");
+  }
+
+  @Test
+  public void testShouldThrowOnWindowFrameWithOffsetBoundsForRange() {
+    // TODO: Remove this test when support for RANGE window frames with offset 
PRECEDING / FOLLOWING is added
+    // Given:
+    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
+    when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema, 
new Object[]{2, "foo"}))
+        
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
+    DataSchema resultSchema =
+        new DataSchema(new String[]{"group", "arg", "sum"}, new 
ColumnDataType[]{INT, STRING, DOUBLE});
+    List<Integer> keys = List.of(0);
+    List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new 
RexExpression.InputRef(1)));
+
+    // Then:
+    IllegalStateException e = Assert.expectThrows(IllegalStateException.class,
+        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, WindowNode.WindowFrameType.RANGE, 5,
+            Integer.MAX_VALUE));
+    assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / 
FOLLOWING is not supported");
+
+    e = Assert.expectThrows(IllegalStateException.class,
+        () -> getOperator(inputSchema, resultSchema, keys, List.of(), 
aggCalls, WindowNode.WindowFrameType.RANGE,
+            Integer.MAX_VALUE, 5));
+    assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING / 
FOLLOWING is not supported");
+  }
+
+  private WindowAggregateOperator getOperator(DataSchema inputSchema, 
DataSchema resultSchema, List<Integer> keys,
+      List<RelFieldCollation> collations, List<RexExpression.FunctionCall> 
aggCalls,
+      WindowNode.WindowFrameType windowFrameType, int lowerBound, int 
upperBound, PlanNode.NodeHint nodeHint) {
+    return new WindowAggregateOperator(OperatorTestUtil.getTracingContext(), 
_input, inputSchema,
+        new WindowNode(-1, resultSchema, nodeHint, List.of(), keys, 
collations, aggCalls, windowFrameType, lowerBound,
+            upperBound, List.of()));
+  }
+
+  private WindowAggregateOperator getOperator(DataSchema inputSchema, 
DataSchema resultSchema, List<Integer> keys,
+      List<RelFieldCollation> collations, List<RexExpression.FunctionCall> 
aggCalls,
+      WindowNode.WindowFrameType windowFrameType, int lowerBound, int 
upperBound) {
+    return getOperator(inputSchema, resultSchema, keys, collations, aggCalls, 
windowFrameType, lowerBound, upperBound,
+        PlanNode.NodeHint.EMPTY);
+  }
+
+  private static RexExpression.FunctionCall getSum(RexExpression arg) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.SUM.name(), List.of(arg));
+  }
+
+  private static RexExpression.FunctionCall getMin(RexExpression arg) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.MIN.name(), List.of(arg));
+  }
+
+  private static RexExpression.FunctionCall getMax(RexExpression arg) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.MAX.name(), List.of(arg));
+  }
+
+  private static RexExpression.FunctionCall getBoolAnd(RexExpression arg) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLAND", 
List.of(arg));
+  }
+
+  private static RexExpression.FunctionCall getBoolOr(RexExpression arg) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, "BOOLOR", 
List.of(arg));
+  }
+
+  private static RexExpression.FunctionCall getFirstValue(RexExpression arg) {
+    return getFirstValue(arg, false);
+  }
+
+  private static RexExpression.FunctionCall getFirstValue(RexExpression arg, 
boolean ignoreNulls) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.FIRST_VALUE.name(), List.of(arg), false,
+        ignoreNulls);
+  }
+
+  private static RexExpression.FunctionCall getLastValue(RexExpression arg) {
+    return getLastValue(arg, false);
+  }
+
+  private static RexExpression.FunctionCall getLastValue(RexExpression arg, 
boolean ignoreNulls) {
+    return new RexExpression.FunctionCall(ColumnDataType.INT, 
SqlKind.LAST_VALUE.name(), List.of(arg), false,
+        ignoreNulls);
   }
 
   private static void verifyResultRows(List<Object[]> resultRows, 
List<Integer> keys,
diff --git 
a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json 
b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json
index dd25411d26..c1cbe9cb0d 100644
--- a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json
+++ b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json
@@ -2388,28 +2388,32 @@
           {"name": "int_col", "type": "INT"},
           {"name": "double_col", "type": "DOUBLE"},
           {"name": "string_col", "type": "STRING"},
-          {"name": "bool_col", "type": "BOOLEAN"}
+          {"name": "bool_col", "type": "BOOLEAN"},
+          {"name": "nullable_int_col", "type": "INT"}
         ],
         "inputs": [
-          [2, 300, "a", true],
-          [2, 400, "a", true],
-          [3, 100, "b", false],
-          [3, 100, "c", true],
-          [100, 1, "b", false],
-          [42, 50.5, "e", true],
-          [42, 42, "d", false],
-          [42, 75, "a", true],
-          [42, 42, "a", false],
-          [42, 50.5, "a", true],
-          [42, 42, "e", false],
-          [-101, 1.01, "c", false],
-          [150, 1.5, "c", false],
-          [150, -1.53, "h", false],
-          [3, 100, "g", true],
-          [2, 400, "c", false]
+          [2, 300, "a", true, 1],
+          [2, 400, "a", true, null],
+          [3, 100, "b", false, null],
+          [3, 100, "c", true, 3],
+          [100, 1, "b", false, 1],
+          [42, 50.5, "e", true, 2],
+          [42, 42, "d", false, null],
+          [42, 75, "a", true, 5],
+          [42, 42, "a", false, 4],
+          [42, 50.5, "a", true, null],
+          [42, 42, "e", false, null],
+          [-101, 1.01, "c", false, 7],
+          [150, 1.5, "c", false, 6],
+          [150, -1.53, "h", false, null],
+          [3, 100, "g", true, 10],
+          [2, 400, "c", false, null]
         ]
       }
     },
+    "extraProps": {
+      "enableColumnBasedNullHandling": true
+    },
     "queries": [
       {
         "description": "Single OVER(PARTITION BY) sum",
@@ -5346,6 +5350,116 @@
           [3, "c", 3],
           [4, "c", 150]
         ]
+      },
+      {
+        "description": "FIRST_VALUE with unbounded window frame",
+        "sql": "SELECT string_col, double_col, nullable_int_col, 
FIRST_VALUE(nullable_int_col) OVER(PARTITION BY string_col ORDER BY double_col 
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM {tbl} ORDER BY 
string_col",
+        "outputs": [
+          ["a", 42.0, 4, 4],
+          ["a", 50.5, null, 4],
+          ["a", 75.0, 5, 4],
+          ["a", 300.0, 1, 4],
+          ["a", 400.0, null, 4],
+          ["b", 1.0, 1, 1],
+          ["b", 100.0, null, 1],
+          ["c", 1.01, 7, 7],
+          ["c", 1.5, 6, 7],
+          ["c", 100.0, 3, 7],
+          ["c", 400.0, null, 7],
+          ["d", 42.0, null, null],
+          ["e", 42.0, null, null],
+          ["e", 50.5, 2, null],
+          ["g", 100.0, 10, 10],
+          ["h", -1.53, null, null]
+        ]
+      },
+      {
+        "description": "FIRST_VALUE with IGNORE NULLS and default window 
frame",
+        "sql": "SELECT string_col, double_col, nullable_int_col, 
FIRST_VALUE(nullable_int_col) IGNORE NULLS OVER(PARTITION BY string_col ORDER 
BY double_col) FROM {tbl} ORDER BY string_col",
+        "outputs": [
+          ["a", 42.0, 4, 4],
+          ["a", 50.5, null, 4],
+          ["a", 75.0, 5, 4],
+          ["a", 300.0, 1, 4],
+          ["a", 400.0, null, 4],
+          ["b", 1.0, 1, 1],
+          ["b", 100.0, null, 1],
+          ["c", 1.01, 7, 7],
+          ["c", 1.5, 6, 7],
+          ["c", 100.0, 3, 7],
+          ["c", 400.0, null, 7],
+          ["d", 42.0, null, null],
+          ["e", 42.0, null, null],
+          ["e", 50.5, 2, 2],
+          ["g", 100.0, 10, 10],
+          ["h", -1.53, null, null]
+        ]
+      },
+      {
+        "description": "LAST_VALUE with RANGE BETWEEN CURRENT ROW AND CURRENT 
ROW window frame",
+        "sql": "SELECT string_col, double_col, nullable_int_col, 
LAST_VALUE(nullable_int_col) OVER(PARTITION BY string_col ORDER BY double_col 
RANGE BETWEEN CURRENT ROW AND CURRENT ROW) FROM {tbl} ORDER BY string_col",
+        "outputs": [
+          ["a", 42.0, 4, 4],
+          ["a", 50.5, null, null],
+          ["a", 75.0, 5, 5],
+          ["a", 300.0, 1, 1],
+          ["a", 400.0, null, null],
+          ["b", 1.0, 1, 1],
+          ["b", 100.0, null, null],
+          ["c", 1.01, 7, 7],
+          ["c", 1.5, 6, 6],
+          ["c", 100.0, 3, 3],
+          ["c", 400.0, null, null],
+          ["d", 42.0, null, null],
+          ["e", 42.0, null, null],
+          ["e", 50.5, 2, 2],
+          ["g", 100.0, 10, 10],
+          ["h", -1.53, null, null]
+        ]
+      },
+      {
+        "description": "LAST_VALUE with IGNORE NULLS and bounded ROWS window 
frame",
+        "sql": "SELECT string_col, double_col, nullable_int_col, 
LAST_VALUE(nullable_int_col) IGNORE NULLS OVER(PARTITION BY string_col ORDER BY 
double_col ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM {tbl} ORDER BY 
string_col",
+        "outputs": [
+          ["a", 42.0, 4, 4],
+          ["a", 50.5, null, 5],
+          ["a", 75.0, 5, 1],
+          ["a", 300.0, 1, 1],
+          ["a", 400.0, null, 1],
+          ["b", 1.0, 1, 1],
+          ["b", 100.0, null, 1],
+          ["c", 1.01, 7, 6],
+          ["c", 1.5, 6, 3],
+          ["c", 100.0, 3, 3],
+          ["c", 400.0, null, 3],
+          ["d", 42.0, null, null],
+          ["e", 42.0, null, 2],
+          ["e", 50.5, 2, 2],
+          ["g", 100.0, 10, 10],
+          ["h", -1.53, null, null]
+        ]
+      },
+      {
+        "description": "LAST_VALUE with RESPECT NULLS and bounded ROWS window 
frame",
+        "sql": "SELECT string_col, double_col, nullable_int_col, 
LAST_VALUE(nullable_int_col) RESPECT NULLS OVER(PARTITION BY string_col ORDER 
BY double_col ROWS BETWEEN 3 PRECEDING AND 1 PRECEDING) FROM {tbl} ORDER BY 
string_col",
+        "outputs": [
+          ["a", 42.0, 4, null],
+          ["a", 50.5, null, 4],
+          ["a", 75.0, 5, null],
+          ["a", 300.0, 1, 5],
+          ["a", 400.0, null, 1],
+          ["b", 1.0, 1, null],
+          ["b", 100.0, null, 1],
+          ["c", 1.01, 7, null],
+          ["c", 1.5, 6, 7],
+          ["c", 100.0, 3, 6],
+          ["c", 400.0, null, 3],
+          ["d", 42.0, null, null],
+          ["e", 42.0, null, null],
+          ["e", 50.5, 2, null],
+          ["g", 100.0, 10, null],
+          ["h", -1.53, null, null]
+        ]
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to