This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 92ac08ca4ac6 [SPARK-50205][SQL][TEST] Re-enable
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession
are isolated`
92ac08ca4ac6 is described below
commit 92ac08ca4ac684dc357761b25910566736e5ea73
Author: Kousuke Saruta <[email protected]>
AuthorDate: Thu Oct 23 11:33:31 2025 -0700
[SPARK-50205][SQL][TEST] Re-enable
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession
are isolated`
### What changes were proposed in this pull request?
This PR aims to reenable
`SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession
are isolated`.
#48736 disabled this test because of it was flaky. In this test, futures
ran on threads managed by `ForkJoinPool`.
Each future invokes `SparkSession#addTag` and `SparkSession#getTag`, and
tags are implemented using `InheritableThreadLocal`. So the root cause of this
issue is same as #52417.
But #48906 replaced `ForkJoinPool` with `Executors.newFixedThreadPool(3)`
so I believe this issue no longer occurs.
In fact, this issue can be reproduced by replacing
`Executors.newFixedThreadPool(3)` with `new ForkJoinPool(3)` and inserting a
sleep like as follows.
```
// global ExecutionContext has only 2 threads in Apache Spark CI
// create own thread pool for four Futures used in this test
- val threadPool = Executors.newFixedThreadPool(3)
+ val threadPool = new ForkJoinPool(3)
...
+ Thread.sleep(1000)
val jobB = Future {
sessionB = globalSession.cloneSession()
import globalSession.implicits._
```
Then, run the test as follows.
```
$ build/sbt 'sql/testOnly
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite -- -z
"Cancellation APIs in Spark\
Session are isolated"'
```
```
info] - Cancellation APIs in SparkSession are isolated *** FAILED *** (2
seconds, 726 milliseconds)
[info] ArrayBuffer({"spark.app.startTime"="1761192376305",
"spark.rdd.scope"="{"id":"3","name":"Exchange"}",
"spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K",
"spark.hadoop.hadoop.caller.context.enabled"="true",
"spark.memory.debugFill"="true", "spark.master.rest.enabled"="false",
"spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse",
"spark.master"="local[2]", "spark.job.interruptOnCancel"="true",
"spark.app.name"="test", "spark.driver.host"="19 [...]
[info] org.scalatest.exceptions.TestFailedException:
[info] at
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$13(SparkSessionJobTaggingAndCancellationSuite.scala:229)
[info] at scala.collection.immutable.List.foreach(List.scala:323)
[info] at
org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$6(SparkSessionJobTaggingAndCancellationSuite.scala:226)
[info] at
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info] at
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info] at
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info] at
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
[info] at
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
[info] at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info] at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:323)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
```
On the other hand, if inserting sleep but leaving
`Executors.newFixedThreadPool(3)` as it is, this test always seems to pass.
So, we can now reenable this test.
### Why are the changes needed?
For better test coverage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The test always passes on my dev environment even if inserting sleep like
explained above.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52704 from sarutak/SPARK-50205.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
index 5ba69c8f9d92..d7b2511eac2a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
@@ -113,8 +113,7 @@ class SparkSessionJobTaggingAndCancellationSuite
activeJob.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong)))
}
- // TODO(SPARK-50205): Re-enable this test case.
- ignore("Cancellation APIs in SparkSession are isolated") {
+ test("Cancellation APIs in SparkSession are isolated") {
sc = new SparkContext("local[2]", "test")
val globalSession =
classic.SparkSession.builder().sparkContext(sc).getOrCreate()
var sessionA: classic.SparkSession = null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]