This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ac818fed738 lead/lag window functions should not fill beyond num rows 
(#17348)
ac818fed738 is described below

commit ac818fed7389209ebcaf2cb17cd402a97f6841a2
Author: Johan Adami <[email protected]>
AuthorDate: Thu Dec 11 04:22:56 2025 -0500

    lead/lag window functions should not fill beyond num rows (#17348)
    
    * lead/lag window functions should not fill beyond num rows
    
    * fix checkstyle
    
    * do not overfill; cleanup variable names; update comment
    
    ---------
    
    Co-authored-by: Johan Adami <[email protected]>
---
 .../window/value/LagValueWindowFunction.java       |  5 +-
 .../window/value/LeadValueWindowFunction.java      |  5 +-
 .../operator/WindowAggregateOperatorTest.java      | 57 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
index 86924c31fb5..1e583bf11c6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
@@ -78,7 +78,10 @@ public class LagValueWindowFunction extends 
ValueWindowFunction {
     int numRows = rows.size();
     Object[] result = new Object[numRows];
     if (_defaultValue != null) {
-      Arrays.fill(result, 0, _offset, _defaultValue);
+      // We only fill up to the minimum of _offset and numRows to handle the 
case
+      // where the offset is larger than the result size.
+      int fillTo = Math.min(_offset, numRows);
+      Arrays.fill(result, 0, fillTo, _defaultValue);
     }
     for (int i = _offset; i < numRows; i++) {
       result[i] = extractValueFromRow(rows.get(i - _offset));
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
index ed3adf03c74..4ec6c641e4b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
@@ -81,7 +81,10 @@ public class LeadValueWindowFunction extends 
ValueWindowFunction {
       result[i] = extractValueFromRow(rows.get(i + _offset));
     }
     if (_defaultValue != null) {
-      Arrays.fill(result, numRows - _offset, numRows, _defaultValue);
+      // If an offset is provided beyond the number of rows, fill all with 
default value
+      // only down to 0.
+      int fillFrom = Math.max(numRows - _offset, 0);
+      Arrays.fill(result, fillFrom, numRows, _defaultValue);
     }
     return Arrays.asList(result);
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index 2882bc7da17..db57f40a068 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -637,6 +637,63 @@ public class WindowAggregateOperatorTest {
     assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done 
processing)");
   }
 
+  @Test
+  public void testLeadLagWindowFunctionWithOffsetGreaterThanNumberOfRows() {
+    // Given: Test with offset much larger than partition size to verify 
overflow handling
+    // Input should be in sorted order on the order by key as SortExchange 
will handle pre-sorting the data
+    DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, STRING});
+    MultiStageOperator input = new 
BlockListMultiStageOperator.Builder(inputSchema)
+        .addRow(1, "alpha")
+        .addRow(1, "beta")
+        .addRow(1, "gamma")
+        .addRow(2, "bar")
+        .addRow(2, "foo")
+        .addRow(3, "single")
+        .buildWithEos();
+    DataSchema resultSchema =
+        new DataSchema(
+            new String[]{"group", "arg", "lead_no_default", "lag_no_default", 
"lead_with_default", "lag_with_default"},
+            new ColumnDataType[]{INT, STRING, INT, INT, INT, INT});
+    List<Integer> keys = List.of(0);
+    List<RelFieldCollation> collations =
+        List.of(new RelFieldCollation(1, 
RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.LAST));
+    List<RexExpression.FunctionCall> aggCalls = List.of(
+        // LEAD with offset 1000, no default value - should return null
+        new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LEAD.name(),
+            List.of(new RexExpression.InputRef(0), new 
RexExpression.Literal(ColumnDataType.INT, 1000))),
+        // LAG with offset 1000, no default value - should return null
+        new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LAG.name(),
+            List.of(new RexExpression.InputRef(0), new 
RexExpression.Literal(ColumnDataType.INT, 1000))),
+        // LEAD with offset Integer.MAX_VALUE and default value 9999
+        new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LEAD.name(),
+            List.of(new RexExpression.InputRef(0), new 
RexExpression.Literal(ColumnDataType.INT, Integer.MAX_VALUE),
+                new RexExpression.Literal(ColumnDataType.INT, 9999))),
+        // LAG with offset Integer.MAX_VALUE and default value 8888
+        new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LAG.name(),
+            List.of(new RexExpression.InputRef(0), new 
RexExpression.Literal(ColumnDataType.INT, Integer.MAX_VALUE),
+                new RexExpression.Literal(ColumnDataType.INT, 8888))));
+    WindowAggregateOperator operator =
+        getOperator(inputSchema, resultSchema, keys, collations, aggCalls, 
WindowNode.WindowFrameType.RANGE,
+            Integer.MIN_VALUE, 0, input);
+
+    // When:
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Then: All rows should return null or default value since offset exceeds 
partition size
+    verifyResultRows(resultRows, keys, Map.of(
+        1, List.of(
+            new Object[]{1, "alpha", null, null, 9999, 8888},
+            new Object[]{1, "beta", null, null, 9999, 8888},
+            new Object[]{1, "gamma", null, null, 9999, 8888}),
+        2, List.of(
+            new Object[]{2, "bar", null, null, 9999, 8888},
+            new Object[]{2, "foo", null, null, 9999, 8888}),
+        3, List.<Object[]>of(
+            new Object[]{3, "single", null, null, 9999, 8888})
+    ));
+    assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done 
processing)");
+  }
+
   @Test(dataProvider = "windowFrameTypes")
   public void 
testSumWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType
 frameType) {
     // Given:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to