This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ac818fed738 lead/lag window functions should not fill beyond num rows
(#17348)
ac818fed738 is described below
commit ac818fed7389209ebcaf2cb17cd402a97f6841a2
Author: Johan Adami <[email protected]>
AuthorDate: Thu Dec 11 04:22:56 2025 -0500
lead/lag window functions should not fill beyond num rows (#17348)
* lead/lag window functions should not fill beyond num rows
* fix checkstyle
* do not overfill; cleanup variable names; update comment
---------
Co-authored-by: Johan Adami <[email protected]>
---
.../window/value/LagValueWindowFunction.java | 5 +-
.../window/value/LeadValueWindowFunction.java | 5 +-
.../operator/WindowAggregateOperatorTest.java | 57 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
index 86924c31fb5..1e583bf11c6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LagValueWindowFunction.java
@@ -78,7 +78,10 @@ public class LagValueWindowFunction extends
ValueWindowFunction {
int numRows = rows.size();
Object[] result = new Object[numRows];
if (_defaultValue != null) {
- Arrays.fill(result, 0, _offset, _defaultValue);
+ // We only fill up to the minimum of _offset and numRows to handle the
case
+ // where the offset is larger than the result size.
+ int fillTo = Math.min(_offset, numRows);
+ Arrays.fill(result, 0, fillTo, _defaultValue);
}
for (int i = _offset; i < numRows; i++) {
result[i] = extractValueFromRow(rows.get(i - _offset));
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
index ed3adf03c74..4ec6c641e4b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/window/value/LeadValueWindowFunction.java
@@ -81,7 +81,10 @@ public class LeadValueWindowFunction extends
ValueWindowFunction {
result[i] = extractValueFromRow(rows.get(i + _offset));
}
if (_defaultValue != null) {
- Arrays.fill(result, numRows - _offset, numRows, _defaultValue);
+ // If an offset is provided beyond the number of rows, fill all with
default value
+ // only down to 0.
+ int fillFrom = Math.max(numRows - _offset, 0);
+ Arrays.fill(result, fillFrom, numRows, _defaultValue);
}
return Arrays.asList(result);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index 2882bc7da17..db57f40a068 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -637,6 +637,63 @@ public class WindowAggregateOperatorTest {
assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done
processing)");
}
+ @Test
+ public void testLeadLagWindowFunctionWithOffsetGreaterThanNumberOfRows() {
+ // Given: Test with offset much larger than partition size to verify
overflow handling
+ // Input should be in sorted order on the order by key as SortExchange
will handle pre-sorting the data
+ DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, STRING});
+ MultiStageOperator input = new
BlockListMultiStageOperator.Builder(inputSchema)
+ .addRow(1, "alpha")
+ .addRow(1, "beta")
+ .addRow(1, "gamma")
+ .addRow(2, "bar")
+ .addRow(2, "foo")
+ .addRow(3, "single")
+ .buildWithEos();
+ DataSchema resultSchema =
+ new DataSchema(
+ new String[]{"group", "arg", "lead_no_default", "lag_no_default",
"lead_with_default", "lag_with_default"},
+ new ColumnDataType[]{INT, STRING, INT, INT, INT, INT});
+ List<Integer> keys = List.of(0);
+ List<RelFieldCollation> collations =
+ List.of(new RelFieldCollation(1,
RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.LAST));
+ List<RexExpression.FunctionCall> aggCalls = List.of(
+ // LEAD with offset 1000, no default value - should return null
+ new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LEAD.name(),
+ List.of(new RexExpression.InputRef(0), new
RexExpression.Literal(ColumnDataType.INT, 1000))),
+ // LAG with offset 1000, no default value - should return null
+ new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LAG.name(),
+ List.of(new RexExpression.InputRef(0), new
RexExpression.Literal(ColumnDataType.INT, 1000))),
+ // LEAD with offset Integer.MAX_VALUE and default value 9999
+ new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LEAD.name(),
+ List.of(new RexExpression.InputRef(0), new
RexExpression.Literal(ColumnDataType.INT, Integer.MAX_VALUE),
+ new RexExpression.Literal(ColumnDataType.INT, 9999))),
+ // LAG with offset Integer.MAX_VALUE and default value 8888
+ new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.LAG.name(),
+ List.of(new RexExpression.InputRef(0), new
RexExpression.Literal(ColumnDataType.INT, Integer.MAX_VALUE),
+ new RexExpression.Literal(ColumnDataType.INT, 8888))));
+ WindowAggregateOperator operator =
+ getOperator(inputSchema, resultSchema, keys, collations, aggCalls,
WindowNode.WindowFrameType.RANGE,
+ Integer.MIN_VALUE, 0, input);
+
+ // When:
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Then: All rows should return null or default value since offset exceeds
partition size
+ verifyResultRows(resultRows, keys, Map.of(
+ 1, List.of(
+ new Object[]{1, "alpha", null, null, 9999, 8888},
+ new Object[]{1, "beta", null, null, 9999, 8888},
+ new Object[]{1, "gamma", null, null, 9999, 8888}),
+ 2, List.of(
+ new Object[]{2, "bar", null, null, 9999, 8888},
+ new Object[]{2, "foo", null, null, 9999, 8888}),
+ 3, List.<Object[]>of(
+ new Object[]{3, "single", null, null, 9999, 8888})
+ ));
+ assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done
processing)");
+ }
+
@Test(dataProvider = "windowFrameTypes")
public void
testSumWithUnboundedPrecedingLowerAndUnboundedFollowingUpper(WindowNode.WindowFrameType
frameType) {
// Given:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]