This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new b5bc19928440 [SPARK-39328][SQL][TESTS] Fix flaky test `SPARK-37753:
Inhibit broadcast in left outer join when there are many empty partitions on
outer/left side`
b5bc19928440 is described below
commit b5bc199284406e4c9f300993ff51f1d9cc23b7d3
Author: cafri.sun <[email protected]>
AuthorDate: Sun Nov 2 15:18:30 2025 -0800
[SPARK-39328][SQL][TESTS] Fix flaky test `SPARK-37753: Inhibit broadcast in
left outer join when there are many empty partitions on outer/left side`
### What changes were proposed in this pull request?
Improve test `SPARK-37753: Inhibit broadcast in left outer join when there
are many empty partitions on outer/left side` of `AdaptiveQueryExecSuite`
### Why are the changes needed?
This test appears to always succeed in the Apache GitHub Action runner
environment, But some environments, test does not seem to proceed as intended.
On my environment:
`4.18.0-553.8.1.el8_10.x86_64`
`Intel(R) Xeon(R) Silver 4210 CPU 2.20GHz`
`64G Mem`
And ran test in master branch following the guide of official documentation
```
./build/sbt
testOnly org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite
...
- SPARK-37753: Inhibit broadcast in left outer join when there are many
empty partitions on outer/left side *** FAILED ***
The code passed to eventually never returned normally. Attempted 25 times
over 15.040156205999999 seconds. Last failure message:
```
even increasing the test's timeout to 1500 seconds results to failure after
lots of retries.
```
SPARK-37753: Inhibit broadcast in left outer join when there are many empty
partitions on outer/left side *** FAILED ***
The code passed to failAfter did not complete within 20 minutes.
(AdaptiveQueryExecSuite.scala:743)
```
---
The test says
```scala
// if the right side is completed first and the left side is still
being executed,
// the right side does not know whether there are many empty partitions
on the left side,
// so there is no demote, and then the right side is broadcast in the
planning stage.
// so retry several times here to avoid unit test failure.
eventually(timeout(15.seconds), interval(500.milliseconds)) {
...
```
It seems test failure occurs with very high probability by loading the
‘right side’ completes first.
While the reason is unclear, I believe it would be better to regulate the
subquery loading speed in a predictable manner via applying simple udf rather
than retrying until both sides load in the desired order.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Rerun the test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52388 from Last-remote11/SPARK-39328.
Authored-by: cafri.sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e7b7acfd44f0ee9df424d1629060e776fadc1c99)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index cede78c0ef3c..3e7d26f74bd4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -22,7 +22,6 @@ import java.net.URI
import org.apache.logging.log4j.Level
import org.scalatest.PrivateMethodTester
-import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
@@ -747,8 +746,13 @@ class AdaptiveQueryExecSuite
// if the right side is completed first and the left side is still being
executed,
// the right side does not know whether there are many empty partitions on
the left side,
// so there is no demote, and then the right side is broadcast in the
planning stage.
- // so retry several times here to avoid unit test failure.
- eventually(timeout(15.seconds), interval(500.milliseconds)) {
+ // so apply `slow_udf` to delay right side to avoid unit test failure.
+ withUserDefinedFunction("slow_udf" -> true) {
+ spark.udf.register("slow_udf", (x: Int) => {
+ Thread.sleep(300)
+ x
+ })
+
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
@@ -756,7 +760,8 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM (select * from testData where value = '1') td" +
- " left outer join testData2 ON key = a")
+ " left outer join (select slow_udf(a) as a, b from testData2) as
td2" +
+ " ON td.key = td2.a")
val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]