This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new b5bc19928440 [SPARK-39328][SQL][TESTS] Fix flaky test `SPARK-37753: 
Inhibit broadcast in left outer join when there are many empty partitions on 
outer/left side`
b5bc19928440 is described below

commit b5bc199284406e4c9f300993ff51f1d9cc23b7d3
Author: cafri.sun <[email protected]>
AuthorDate: Sun Nov 2 15:18:30 2025 -0800

    [SPARK-39328][SQL][TESTS] Fix flaky test `SPARK-37753: Inhibit broadcast in 
left outer join when there are many empty partitions on outer/left side`
    
    ### What changes were proposed in this pull request?
    
    Improve test `SPARK-37753: Inhibit broadcast in left outer join when there 
are many empty partitions on outer/left side` of `AdaptiveQueryExecSuite`
    
    ### Why are the changes needed?
    
    This test appears to always succeed in the Apache GitHub Action runner 
environment, But some environments, test does not seem to proceed as intended.
    
    On my environment:
    `4.18.0-553.8.1.el8_10.x86_64`
    `Intel(R) Xeon(R) Silver 4210 CPU  2.20GHz`
    `64G Mem`
    And ran test in master branch following the guide of official documentation
    ```
    ./build/sbt
    testOnly org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite
    ...
    - SPARK-37753: Inhibit broadcast in left outer join when there are many 
empty partitions on outer/left side *** FAILED ***
      The code passed to eventually never returned normally. Attempted 25 times 
over 15.040156205999999 seconds. Last failure message:
    ```
    even increasing the test's timeout to 1500 seconds results to failure after 
lots of retries.
    ```
    SPARK-37753: Inhibit broadcast in left outer join when there are many empty 
partitions on outer/left side *** FAILED ***
      The code passed to failAfter did not complete within 20 minutes. 
(AdaptiveQueryExecSuite.scala:743)
    ```
    
    ---
    
    The test says
    ```scala
        // if the right side is completed first and the left side is still 
being executed,
        // the right side does not know whether there are many empty partitions 
on the left side,
        // so there is no demote, and then the right side is broadcast in the 
planning stage.
        // so retry several times here to avoid unit test failure.
        eventually(timeout(15.seconds), interval(500.milliseconds)) {
    ...
    ```
    It seems test failure occurs with very high probability by loading the 
‘right side’ completes first.
    
    While the reason is unclear, I believe it would be better to regulate the 
subquery loading speed in a predictable manner via applying simple udf rather 
than retrying until both sides load in the desired order.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Rerun the test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52388 from Last-remote11/SPARK-39328.
    
    Authored-by: cafri.sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit e7b7acfd44f0ee9df424d1629060e776fadc1c99)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala     | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index cede78c0ef3c..3e7d26f74bd4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -22,7 +22,6 @@ import java.net.URI
 
 import org.apache.logging.log4j.Level
 import org.scalatest.PrivateMethodTester
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
@@ -747,8 +746,13 @@ class AdaptiveQueryExecSuite
     // if the right side is completed first and the left side is still being 
executed,
     // the right side does not know whether there are many empty partitions on 
the left side,
     // so there is no demote, and then the right side is broadcast in the 
planning stage.
-    // so retry several times here to avoid unit test failure.
-    eventually(timeout(15.seconds), interval(500.milliseconds)) {
+    // so apply `slow_udf` to delay right side to avoid unit test failure.
+    withUserDefinedFunction("slow_udf" -> true) {
+      spark.udf.register("slow_udf", (x: Int) => {
+        Thread.sleep(300)
+        x
+      })
+
       withSQLConf(
         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
         SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
@@ -756,7 +760,8 @@ class AdaptiveQueryExecSuite
         withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
           val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
             "SELECT * FROM (select * from testData where value = '1') td" +
-              " left outer join testData2 ON key = a")
+              " left outer join (select slow_udf(a) as a, b from testData2) as 
td2" +
+              " ON td.key = td2.a")
           val smj = findTopLevelSortMergeJoin(plan)
           assert(smj.size == 1)
           val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to