This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2530561c2493 [SPARK-46367][SQL][FOLLOWUP] Assert same arity in
projectKeyedPartitionings
2530561c2493 is described below
commit 2530561c24939fb8eaf8237889970203c23b6611
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri May 15 11:41:28 2026 +0800
[SPARK-46367][SQL][FOLLOWUP] Assert same arity in projectKeyedPartitionings
### What changes were proposed in this pull request?
Follow-up to https://github.com/apache/spark/pull/55519.
`PartitioningPreservingUnaryExecNode.projectKeyedPartitionings` assumes all
input KPs share the same `partitionKeys`, which implies the same expression
arity. This invariant is asserted by `GroupPartitionsExec` and is established
by every upstream constructor of `PartitioningCollection` that feeds this
method (a join's `PartitioningCollection(left.outputPartitioning,
right.outputPartitioning)` combines KPs that `EnsureRequirements` has aligned
to the same join keys).
If the invariant is ever violated upstream, indexing `kp.expressions(i)`
for `i >= kp.expressions.length` throws an opaque `IndexOutOfBoundsException`
that points at this method rather than at the producer.
This PR adds an `assert` that surfaces the real cause with a clear message.
The invariant is unchanged; this just turns silent misuse into a debuggable
failure, so a producer-side bug can be fixed at its source.
### Why are the changes needed?
Improve error message when an upstream node violates the same-arity
invariant. No behavior change on valid plans.
### Does this PR introduce _any_ user-facing change?
No (assertion-only; planner internal).
### How was this patch tested?
New unit test `SPARK-46367: mixed-arity KeyedPartitionings in input fail
with a clear assertion` in `ProjectedOrderingAndPartitioningSuite` that
constructs a mixed-arity `PartitioningCollection` child and verifies the assert
fires with the expected message instead of throwing `IndexOutOfBoundsException`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes #55876 from cloud-fan/SPARK-46367-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/AliasAwareOutputExpression.scala | 10 ++++++++++
.../ProjectedOrderingAndPartitioningSuite.scala | 23 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
index 910cbcf2210a..1f2b1d0a585d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
@@ -94,6 +94,16 @@ trait PartitioningPreservingUnaryExecNode extends
UnaryExecNode
kps: Seq[KeyedPartitioning]): LazyList[KeyedPartitioning] = {
if (kps.isEmpty) return LazyList.empty
val numPositions = kps.head.expressions.length
+ // The function assumes all input KPs share the same `partitionKeys`,
which implies matching
+ // expression arity. This invariant is asserted by [[GroupPartitionsExec]]
and is established
+ // by the constructors of [[PartitioningCollection]] feeding this method
(a join's
+ // `PartitioningCollection(left.outputPartitioning,
right.outputPartitioning)` combines KPs
+ // that have been aligned by [[EnsureRequirements]] to the same join
keys). If the invariant
+ // is ever violated upstream, fail early with a clear message instead of
throwing an opaque
+ // `IndexOutOfBoundsException` from `kp.expressions(i)` below.
+ assert(kps.tail.forall(_.expressions.length == numPositions),
+ s"All input KeyedPartitionings must share the same expression arity, " +
+ s"but got: ${kps.map(_.expressions.length).mkString(", ")}.")
val alternativesPerPosition: IndexedSeq[LazyList[Expression]] =
if (hasAlias) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
index db664b04ef08..a38570924620 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
@@ -586,6 +586,29 @@ class ProjectedOrderingAndPartitioningSuite
case other => fail(s"Expected KeyedPartitioning, got $other")
}
}
+
+ test("SPARK-46367: mixed-arity KeyedPartitionings in input fail with a clear
assertion") {
+ // The function assumes all input KPs share the same arity (the invariant
asserted by
+ // `GroupPartitionsExec`). Without the assert below, indexing
`kp.expressions(i)` for
+ // `i >= kp.expressions.length` would throw an opaque
`IndexOutOfBoundsException`. The assert
+ // surfaces the real cause -- an upstream node violated the invariant --
so the bug can be
+ // fixed at the producer.
+ val x = AttributeReference("x", IntegerType)()
+ val y = AttributeReference("y", IntegerType)()
+ val keys2d = Seq(InternalRow(1, 1), InternalRow(2, 2))
+ val keys1d = Seq(InternalRow(1), InternalRow(2))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(x, y),
+ partitioning = PartitioningCollection(Seq(
+ KeyedPartitioning(Seq(x, y), keys2d),
+ KeyedPartitioning(Seq(x), keys1d))))
+ val project = ProjectExec(Seq(x), child)
+ val e = intercept[AssertionError] {
+ project.outputPartitioning
+ }
+ assert(e.getMessage.contains("All input KeyedPartitionings must share the
same expression " +
+ "arity"))
+ }
}
private case class DummyLeafExecWithPartitioning(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]