This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 8339f96afa73 [SPARK-56546][SQL][FOLLOWUP] Address review comments in
segment-tree window frame
8339f96afa73 is described below
commit 8339f96afa73704be9f9e8c5452cb85f181b7604
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed May 13 11:51:24 2026 +0800
[SPARK-56546][SQL][FOLLOWUP] Address review comments in segment-tree window
frame
### What changes were proposed in this pull request?
Four small cleanups in the segment-tree moving-frame window code introduced
by #55422:
1. `WindowEvaluatorFactoryBase.scala` -- fix terminology in the `def
processor` comment. The comment says `Keep as def (by-name)`, but `def
processor(index: Int)` is a parameterized method, not a by-name parameter (`=>
T`). Reword to `Keep as def (lazy / per-call)`.
2. `WindowEvaluatorFactoryBase.eligibleForSegTree` -- add a defensive `case
_ => false` to the `frameType match` so future additions to the sealed
`FrameType` trait do not silently throw `MatchError` at runtime.
3. `WindowEvaluatorFactoryBase.estimateMaxCachedBlocks` -- add a comment
justifying the `+ 2` slack in the cached-block budget (one boundary block at
each end of the frame's interval), since the magic number was not previously
explained.
4. `WindowSegmentTreeSuite.scala` -- fix indentation of 11 `test(` blocks
that were declared at 4-space indent (inconsistent with the file's 2-space
convention and the 3 other correctly-indented tests in the same file).
A separate follow-up is needed to add RANGE-frame coverage to
`WindowBenchmark` -- the current benchmark is RowFrame-only -- but that
requires regenerating the committed JDK 17/21/25 results files and is deferred.
### Why are the changes needed?
Review-comment-style follow-ups. Pure comment / defensive-default /
whitespace changes -- no behavior change.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests cover all touched code paths. The test indentation fix is
whitespace-only; the comment and `case _` changes have no runtime effect.
### Was this patch authored or co-authored using generative AI tooling?
Yes, Claude assisted in identifying and drafting these cleanups.
Closes #55815 from cloud-fan/cloud-fan/SPARK-56546-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 407a29c6da0a9e88c2d57226024239c10f80c94e)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../window/WindowEvaluatorFactoryBase.scala | 15 +++++++++++----
.../execution/window/WindowSegmentTreeSuite.scala | 22 +++++++++++-----------
2 files changed, 22 insertions(+), 15 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index cebcfd05bd24..2ae10ce9d711 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -203,10 +203,10 @@ trait WindowEvaluatorFactoryBase {
case WindowExpression(ae: AggregateExpression, _) => ae.filter
case _ => None
}.toArray
- // Keep as `def` (by-name): the FRAME_LESS_OFFSET / UNBOUNDED_OFFSET /
- // UNBOUNDED_PRECEDING_OFFSET branches do not read `processor`. Eager
- // `val` construction would invoke `AggregateProcessor.apply` on
- // Lag / Lead / NthValue and throw
+ // Keep as `def` (lazy / per-call): the FRAME_LESS_OFFSET /
+ // UNBOUNDED_OFFSET / UNBOUNDED_PRECEDING_OFFSET branches do not read
+ // `processor`. Eager `val` construction would invoke
+ // `AggregateProcessor.apply` on Lag / Lead / NthValue and throw
// `INTERNAL_ERROR: Unsupported aggregate function`.
def processor = if
(functions.exists(_.isInstanceOf[PythonFuncExpression])) {
null
@@ -367,6 +367,7 @@ trait WindowEvaluatorFactoryBase {
val frameTypeOk = frameType match {
case RowFrame => true
case RangeFrame => orderSpec.size == 1
+ case _ => false
}
conf.windowSegmentTreeEnabled &&
frameTypeOk &&
@@ -395,6 +396,12 @@ trait WindowEvaluatorFactoryBase {
case (IntegerLiteral(lo), CurrentRow) => Some(math.abs(lo) + 1)
case _ => None
}
+ // `ceil(W / blockSize)` is the minimum number of blocks a single frame can
+ // straddle; `+ 2` adds one block of slack at each end to cover the case
+ // where the frame's [lower, upper) interval is offset within its leftmost
+ // block (so the cursor temporarily holds the previous block as well) and
+ // the symmetric case at the right edge -- without this slack the LRU
+ // would thrash on the boundary blocks every time the cursor advances.
w.map(ww => math.ceil(ww.toDouble / blockSize).toInt + 2).orElse(Some(8))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
index ac2a04744fcf..443d160394fa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala
@@ -87,7 +87,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
if (out.isNullAt(0)) null else out.getInt(0)
}
- test("build and single-point query returns identity; full scan matches
naive") {
+ test("build and single-point query returns identity; full scan matches
naive") {
withTaskContext {
val values = Seq(5, 2, 9, 1, 7, 3, 4, 8, 6, 0)
val tree = buildTree(values, fanout = 4, blockSize = 1024)
@@ -102,7 +102,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("single-block: range query matches naive baseline for random ranges")
{
+ test("single-block: range query matches naive baseline for random ranges") {
withTaskContext {
val rnd = new Random(0xC0FFEE)
val values = Seq.fill(100)(rnd.nextInt(1000))
@@ -119,7 +119,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("fanout boundaries: sizes {1, F, F+1, F*F} for fanout in {2,4,8,16}")
{
+ test("fanout boundaries: sizes {1, F, F+1, F*F} for fanout in {2,4,8,16}") {
withTaskContext {
val rnd = new Random(42)
for (fanout <- Seq(2, 4, 8, 16)) {
@@ -151,7 +151,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("identity at empty range query(k, k)") {
+ test("identity at empty range query(k, k)") {
withTaskContext {
val values = (1 to 50).reverse
val tree = buildTree(values, fanout = 4, blockSize = 16)
@@ -163,7 +163,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("block boundary correctness: cross-block vs single-block baseline") {
+ test("block boundary correctness: cross-block vs single-block baseline") {
withTaskContext {
val rnd = new Random(123)
val values = Seq.fill(100)(rnd.nextInt(10000))
@@ -182,7 +182,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("LRU stability: same queries in different orders produce same
results") {
+ test("LRU stability: same queries in different orders produce same results")
{
withTaskContext {
val rnd = new Random(777)
val values = Seq.fill(100)(rnd.nextInt(10000))
@@ -213,7 +213,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("cross-block: range query matches naive baseline for random ranges") {
+ test("cross-block: range query matches naive baseline for random ranges") {
withTaskContext {
val rnd = new Random(0xBEEF)
val values = Seq.fill(100)(rnd.nextInt(1000))
@@ -231,7 +231,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("cross-block: multi-block level-size invariant") {
+ test("cross-block: multi-block level-size invariant") {
withTaskContext {
val rnd = new Random(31337)
val fanout = 4
@@ -329,7 +329,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("D10 rebuild: second build replaces state; failed build preserves
prior state") {
+ test("D10 rebuild: second build replaces state; failed build preserves prior
state") {
withTaskContext {
val v1 = Seq(5, 1, 9, 3, 7, 2, 8, 4, 6, 0)
val v2 = Seq(100, 200, 50, 400, 25, 600, 12, 800)
@@ -380,7 +380,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- test("D11 error paths: invalid ctor args and invalid query ranges") {
+ test("D11 error paths: invalid ctor args and invalid query ranges") {
withTaskContext {
// Constructor validation.
intercept[IllegalArgumentException] {
@@ -503,7 +503,7 @@ class WindowSegmentTreeSuite extends SparkFunSuite with
LocalSparkContext {
math.sqrt(sq / (n - 1))
}
- test("D12 block-aligned cross-block boundaries") {
+ test("D12 block-aligned cross-block boundaries") {
withTaskContext {
val rnd = new Random(12)
val numRows = 50
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]