Repository: spark Updated Branches: refs/heads/master b5a997667 -> d85bb10ce
[SPARK-16116][SQL] ConsoleSink should not require checkpointLocation ## What changes were proposed in this pull request? When the user uses `ConsoleSink`, we should use a temp location if `checkpointLocation` is not specified. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <[email protected]> Closes #13817 from zsxwing/console-checkpoint. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d85bb10c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d85bb10c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d85bb10c Branch: refs/heads/master Commit: d85bb10ce49926b8b661bd2cb97392205742fc14 Parents: b5a9976 Author: Shixiong Zhu <[email protected]> Authored: Thu Jun 23 10:46:20 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Thu Jun 23 10:46:20 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 8 ++++++++ .../sql/streaming/test/DataStreamReaderWriterSuite.scala | 10 ++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d85bb10c/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 1977074..d4b0a3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -272,6 +272,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { useTempCheckpointLocation = true, trigger = trigger) } else { + val (useTempCheckpointLocation, recoverFromCheckpointLocation) = + if (source == "console") { + (true, false) + } else { + (false, true) + } val dataSource = DataSource( df.sparkSession, @@ -284,6 +290,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { df, dataSource.createSink(outputMode), outputMode, + useTempCheckpointLocation = useTempCheckpointLocation, + recoverFromCheckpointLocation = recoverFromCheckpointLocation, trigger = trigger) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d85bb10c/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 943e7b7..f099439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -457,4 +457,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } } + + test("ConsoleSink should not require checkpointLocation") { + LastOptions.clear() + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val sq = df.writeStream.format("console").start() + sq.stop() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
