This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ec45d10 [SPARK-32845][SS][TESTS] Add sinkParameter to check sink
options robustly in DataStreamReaderWriterSuite
ec45d10 is described below
commit ec45d10d26621a0541d937bf6850e153b6cd426b
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Sep 11 11:48:34 2020 -0700
[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly
in DataStreamReaderWriterSuite
This PR aims to add `sinkParameter` to check sink options robustly and
independently in DataStreamReaderWriterSuite
`LastOptions.parameters` is designed to catch three cases: `sourceSchema`,
`createSource`, `createSink`. However, `StreamQuery.stop` invokes
`queryExecutionThread.join`, `runStream`, `createSource` immediately and reset
the stored options by `createSink`.
To catch `createSink` options, currently, the test suite is trying a
workaround pattern. However, we observed a flakiness in this pattern sometimes.
If we split `createSink` option separately, we don't need this workaround and
can eliminate this flakiness.
```scala
val query = df.writeStream.
...
.start()
assert(LastOptions.paramters(..))
query.stop()
```
No. This is a test-only change.
Pass the newly updated test case.
Closes #29730 from dongjoon-hyun/SPARK-32845.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b4be6a6d12bf62f02cffe0bcc97ef32d27827d57)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../test/DataStreamReaderWriterSuite.scala | 23 +++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index d90af35..8bf7e27 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -43,11 +43,13 @@ object LastOptions {
var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
+ var sinkParameters: Map[String, String] = null
var schema: Option[StructType] = null
var partitionColumns: Seq[String] = Nil
def clear(): Unit = {
parameters = null
+ sinkParameters = null
schema = null
partitionColumns = null
reset(mockStreamSourceProvider)
@@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with
StreamSinkProvider {
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
- LastOptions.parameters = parameters
+ LastOptions.sinkParameters = parameters
LastOptions.partitionColumns = partitionColumns
LastOptions.mockStreamSinkProvider.createSink(spark, parameters,
partitionColumns, outputMode)
(_: Long, _: DataFrame) => {}
@@ -169,20 +171,19 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
LastOptions.clear()
- val query = df.writeStream
+ df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("opt1", "5")
.options(Map("opt2" -> "4"))
.options(map)
.option("checkpointLocation", newMetadataDir)
.start()
+ .stop()
- assert(LastOptions.parameters("opt1") == "5")
- assert(LastOptions.parameters("opt2") == "4")
- assert(LastOptions.parameters("opt3") == "3")
- assert(LastOptions.parameters.contains("checkpointLocation"))
-
- query.stop()
+ assert(LastOptions.sinkParameters("opt1") == "5")
+ assert(LastOptions.sinkParameters("opt2") == "4")
+ assert(LastOptions.sinkParameters("opt3") == "3")
+ assert(LastOptions.sinkParameters.contains("checkpointLocation"))
}
test("SPARK-32832: later option should override earlier options for load()")
{
@@ -203,7 +204,7 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
.load()
assert(LastOptions.parameters.isEmpty)
- val query = ds.writeStream
+ ds.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.option("paTh", "1")
@@ -212,8 +213,8 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
.option("patH", "4")
.option("path", "5")
.start()
- assert(LastOptions.parameters("path") == "5")
- query.stop()
+ .stop()
+ assert(LastOptions.sinkParameters("path") == "5")
}
test("partitioning") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]