This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7d7afb06f68 [SPARK-38723][SS][TESTS] Add test for streaming query
resume race condition
7d7afb06f68 is described below
commit 7d7afb06f682c10f3900eb8adeab9fad6d49cb24
Author: Phil Dakin <[email protected]>
AuthorDate: Thu Oct 26 14:24:09 2023 +0900
[SPARK-38723][SS][TESTS] Add test for streaming query resume race condition
### What changes were proposed in this pull request?
Add a test for the CONCURRENT_QUERY error raised when multiple sessions try
to simultaneously resume the same streaming query from checkpoint.
### Why are the changes needed?
Improve testing coverage per
https://issues.apache.org/jira/browse/SPARK-38723.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Change is itself a test - ran locally and confirmed the suite passes.
```
[info] All tests passed.
[success] Total time: 129 s (02:09), completed Oct 17, 2023, 2:11:34 PM
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43405 from PhilDakin/pdakin.SPARK-38723.
Authored-by: Phil Dakin <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/errors/QueryExecutionErrorsSuite.scala | 48 ++++++++++++++++++++++
1 file changed, 48 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 78bbabb1a3f..fb1d05f2a9a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -25,6 +25,7 @@ import java.util.{Locale, Properties,
ServiceConfigurationError}
import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.mockito.Mockito.{mock, spy, when}
+import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest,
Row, SaveMode}
@@ -49,6 +50,7 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DataType, DecimalType, LongType,
MetadataBuilder, StructType}
+import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.Utils
class QueryExecutionErrorsSuite
@@ -876,6 +878,52 @@ class QueryExecutionErrorsSuite
assert(e.getCause.isInstanceOf[NullPointerException])
}
+ test("CONCURRENT_QUERY: streaming query is resumed from many sessions") {
+ failAfter(90 seconds) {
+ withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
+ withTempDir { dir =>
+ val ds = spark.readStream.format("rate").load()
+
+ // Queries have the same ID when they are resumed from the same
checkpoint.
+ val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+ val dataLocation = new File(dir, "data").getCanonicalPath
+
+ // Run an initial query to setup the checkpoint.
+ val initialQuery = ds.writeStream.format("parquet")
+ .option("checkpointLocation", chkLocation).start(dataLocation)
+
+ // Error is thrown due to a race condition. Ensure it happens with
high likelihood in the
+ // test by spawning many threads.
+ val exceptions = ThreadUtils.parmap(Seq.range(1, 50),
"QueryExecutionErrorsSuite", 50)
+ { _ =>
+ var exception = None :
Option[SparkConcurrentModificationException]
+ try {
+ val restartedQuery = ds.writeStream.format("parquet")
+ .option("checkpointLocation",
chkLocation).start(dataLocation)
+ restartedQuery.stop()
+ restartedQuery.awaitTermination()
+ } catch {
+ case e: SparkConcurrentModificationException =>
+ exception = Some(e)
+ }
+ exception
+ }
+ assert(exceptions.map(e => e.isDefined).reduceLeft(_ || _))
+ exceptions.map { e =>
+ if (e.isDefined) {
+ checkError(
+ e.get,
+ errorClass = "CONCURRENT_QUERY",
+ sqlState = Some("0A000")
+ )
+ }
+ }
+ spark.streams.active.foreach(_.stop())
+ }
+ }
+ }
+ }
+
test("UNSUPPORTED_EXPR_FOR_WINDOW: to_date is not supported with WINDOW") {
withTable("t") {
sql("CREATE TABLE t(c String) USING parquet")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]