This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ac08ca4ac6 [SPARK-50205][SQL][TEST] Re-enable 
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession 
are isolated`
92ac08ca4ac6 is described below

commit 92ac08ca4ac684dc357761b25910566736e5ea73
Author: Kousuke Saruta <[email protected]>
AuthorDate: Thu Oct 23 11:33:31 2025 -0700

    [SPARK-50205][SQL][TEST] Re-enable 
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession 
are isolated`
    
    ### What changes were proposed in this pull request?
    This PR aims to reenable 
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession 
are isolated`.
    
    #48736 disabled this test because of it was flaky. In this test, futures 
ran on threads managed by `ForkJoinPool`.
    Each future invokes `SparkSession#addTag` and `SparkSession#getTag`, and 
tags are implemented using `InheritableThreadLocal`. So the root cause of this 
issue is same as #52417.
    
    But #48906 replaced `ForkJoinPool` with `Executors.newFixedThreadPool(3)` 
so I believe this issue no longer occurs.
    In fact, this issue can be reproduced by replacing 
`Executors.newFixedThreadPool(3)` with `new ForkJoinPool(3)` and inserting a 
sleep like as follows.
    
    ```
         // global ExecutionContext has only 2 threads in Apache Spark CI
         // create own thread pool for four Futures used in this test
    -    val threadPool = Executors.newFixedThreadPool(3)
    +    val threadPool = new ForkJoinPool(3)
    
    ...
    
    +      Thread.sleep(1000)
           val jobB = Future {
             sessionB = globalSession.cloneSession()
             import globalSession.implicits._
    ```
    
    Then, run the test as follows.
    ```
    $ build/sbt 'sql/testOnly 
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite -- -z 
"Cancellation APIs in Spark\
    Session are isolated"'
    ```
    
    ```
    info] - Cancellation APIs in SparkSession are isolated *** FAILED *** (2 
seconds, 726 milliseconds)
    [info]   ArrayBuffer({"spark.app.startTime"="1761192376305", 
"spark.rdd.scope"="{"id":"3","name":"Exchange"}", 
"spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K", 
"spark.hadoop.hadoop.caller.context.enabled"="true", 
"spark.memory.debugFill"="true", "spark.master.rest.enabled"="false", 
"spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse",
 "spark.master"="local[2]", "spark.job.interruptOnCancel"="true", 
"spark.app.name"="test", "spark.driver.host"="19 [...]
    [info]   org.scalatest.exceptions.TestFailedException:
    [info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
    [info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
    [info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
    [info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
    [info]   at 
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$13(SparkSessionJobTaggingAndCancellationSuite.scala:229)
    [info]   at scala.collection.immutable.List.foreach(List.scala:323)
    [info]   at 
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$6(SparkSessionJobTaggingAndCancellationSuite.scala:226)
    [info]   at 
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
    [info]   at 
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
    [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
    [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
    [info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
    [info]   at 
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
    [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
    [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
    [info]   at 
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
    [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
    [info]   at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
    [info]   at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
    [info]   at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
    [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
    [info]   at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    [info]   at scala.collection.immutable.List.foreach(List.scala:323)
    [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
    [info]   at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
    [info]   at org.scalatest.Suite.run(Suite.scala:1114)
    [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
    [info]   at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
    [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
    [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
    [info]   at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
    [info]   at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
    [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
    [info]   at 
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
    [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
    [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
    [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
    [info]   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    [info]   at java.base/java.lang.Thread.run(Thread.java:840)
    ```
    
    On the other hand, if inserting sleep but leaving 
`Executors.newFixedThreadPool(3)` as it is, this test always seems to pass.
    So, we can now reenable this test.
    
    ### Why are the changes needed?
    For better test coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    The test always passes on my dev environment even if inserting sleep like 
explained above.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52704 from sarutak/SPARK-50205.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
index 5ba69c8f9d92..d7b2511eac2a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
@@ -113,8 +113,7 @@ class SparkSessionJobTaggingAndCancellationSuite
         
activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong)))
   }
 
-  // TODO(SPARK-50205): Re-enable this test case.
-  ignore("Cancellation APIs in SparkSession are isolated") {
+  test("Cancellation APIs in SparkSession are isolated") {
     sc = new SparkContext("local[2]", "test")
     val globalSession = 
classic.SparkSession.builder().sparkContext(sc).getOrCreate()
     var sessionA: classic.SparkSession = null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to