Repository: spark Updated Branches: refs/heads/branch-2.1 cd297c390 -> 3857d5ba8
[SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf ## What changes were proposed in this pull request? Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks. ## How was this patch tested? Unit tests Author: Burak Yavuz <[email protected]> Closes #16342 from brkyvz/chk-rec. (cherry picked from commit caed89321fdabe83e46451ca4e968f86481ad500) Signed-off-by: Shixiong Zhu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3857d5ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3857d5ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3857d5ba Branch: refs/heads/branch-2.1 Commit: 3857d5ba8b195bc1eb4b75f00398535b42164ff1 Parents: cd297c3 Author: Burak Yavuz <[email protected]> Authored: Tue Dec 20 14:19:35 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Tue Dec 20 14:19:41 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../test/DataStreamReaderWriterSuite.scala | 32 +++++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3857d5ba/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b3c600a..b7fc336 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") - val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete() + val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), chkpointLoc, http://git-wip-us.apache.org/repos/asf/spark/blob/3857d5ba/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index acac0bf..9de3da3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -470,24 +470,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { sq.stop() } - test("MemorySink can recover from a checkpoint in Complete Mode") { + private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) val df = ms.toDF().toDF("a") - val checkpointLoc = newMetadataDir - val checkpointDir = new File(checkpointLoc, "offsets") - checkpointDir.mkdirs() - assert(checkpointDir.exists()) val tableName = "test" def startQuery: StreamingQuery = { - df.groupBy("a") + val writer = df.groupBy("a") .count() .writeStream .format("memory") .queryName(tableName) - .option("checkpointLocation", checkpointLoc) .outputMode("complete") - .start() + if (provideInWriter) { + writer.option("checkpointLocation", chkLoc) + } + writer.start() } // no exception here val q = startQuery @@ -513,6 +511,24 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { q2.stop() } + test("MemorySink can recover from a checkpoint in Complete Mode") { + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true) + } + + test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf in Complete Mode") { + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) { + testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false) + } + } + test("append mode memory sink's do not support checkpoint recovery") { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
