Repository: spark
Updated Branches:
  refs/heads/master e4fa58c43 -> d601894c0


[SPARK-16335][SQL] Structured streaming should fail if source directory does 
not exist

## What changes were proposed in this pull request?
In structured streaming, Spark does not report errors when the specified 
directory does not exist. This is a behavior different from the batch mode. 
This patch changes the behavior to fail if the directory does not exist (when 
the path is not a glob pattern).

## How was this patch tested?
Updated unit tests to reflect the new behavior.

Author: Reynold Xin <[email protected]>

Closes #14002 from rxin/SPARK-16335.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d601894c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d601894c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d601894c

Branch: refs/heads/master
Commit: d601894c0494d415e7f330e02168c43a2dacfb02
Parents: e4fa58c
Author: Reynold Xin <[email protected]>
Authored: Fri Jul 1 15:16:04 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Jul 1 15:16:04 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 10 +++---
 python/pyspark/sql/streaming.py                 | 11 +++---
 .../sql/execution/datasources/DataSource.scala  | 12 +++++++
 .../sql/streaming/FileStreamSourceSuite.scala   | 38 ++++++++------------
 4 files changed, 36 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d601894c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index bb1793d..90c71cc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -232,6 +232,10 @@ class SparkHadoopUtil extends Logging {
     recurse(baseStatus)
   }
 
+  def isGlobPath(pattern: Path): Boolean = {
+    pattern.toString.exists("{}[]*?\\".toSet.contains)
+  }
+
   def globPath(pattern: Path): Seq[Path] = {
     val fs = pattern.getFileSystem(conf)
     Option(fs.globStatus(pattern)).map { statuses =>
@@ -240,11 +244,7 @@ class SparkHadoopUtil extends Logging {
   }
 
   def globPathIfNecessary(pattern: Path): Seq[Path] = {
-    if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
-      globPath(pattern)
-    } else {
-      Seq(pattern)
-    }
+    if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d601894c/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index bffe398..8bac347 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -315,7 +315,7 @@ class DataStreamReader(OptionUtils):
 
         >>> json_sdf = spark.readStream.format("json")\
                                        .schema(sdf_schema)\
-                                       
.load(os.path.join(tempfile.mkdtemp(),'data'))
+                                       .load(tempfile.mkdtemp())
         >>> json_sdf.isStreaming
         True
         >>> json_sdf.schema == sdf_schema
@@ -382,8 +382,7 @@ class DataStreamReader(OptionUtils):
                                           it uses the value specified in
                                           
``spark.sql.columnNameOfCorruptRecord``.
 
-        >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 
'data'), \
-                schema = sdf_schema)
+        >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> json_sdf.isStreaming
         True
         >>> json_sdf.schema == sdf_schema
@@ -411,8 +410,7 @@ class DataStreamReader(OptionUtils):
 
         .. note:: Experimental.
 
-        >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
-                .parquet(os.path.join(tempfile.mkdtemp()))
+        >>> parquet_sdf = 
spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
         >>> parquet_sdf.isStreaming
         True
         >>> parquet_sdf.schema == sdf_schema
@@ -512,8 +510,7 @@ class DataStreamReader(OptionUtils):
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted 
records.
 
-        >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 
'data'), \
-                schema = sdf_schema)
+        >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> csv_sdf.isStreaming
         True
         >>> csv_sdf.schema == sdf_schema

http://git-wip-us.apache.org/repos/asf/spark/blob/d601894c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index a4110d7..6dc27c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -203,6 +203,18 @@ case class DataSource(
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
+
+        // Check whether the path exists if it is not a glob pattern.
+        // For glob pattern, we do not check it because the glob pattern might 
only make sense
+        // once the streaming job starts and some upstream source starts 
dropping data.
+        val hdfsPath = new Path(path)
+        if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
+          val fs = 
hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+          if (!fs.exists(hdfsPath)) {
+            throw new AnalysisException(s"Path does not exist: $path")
+          }
+        }
+
         val isSchemaInferenceEnabled = 
sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
         val isTextSource = providingClass == classOf[text.TextFileFormat]
         // If the schema inference is disabled, only text sources require 
schema to be specified

http://git-wip-us.apache.org/repos/asf/spark/blob/d601894c/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6c04846..8a34cf9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { 
testError() }
   }
 
-  test("FileStreamSource schema: path doesn't exist, no schema") {
-    val e = intercept[IllegalArgumentException] {
-      createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), 
schema = None)
+  test("FileStreamSource schema: path doesn't exist (without schema) should 
throw exception") {
+    withTempDir { dir =>
+      intercept[AnalysisException] {
+        val userSchema = new StructType().add(new StructField("value", 
IntegerType))
+        val schema = createFileStreamSourceAndGetSchema(
+          format = None, path = Some(new File(dir, "1").getAbsolutePath), 
schema = None)
+      }
     }
-    assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema 
absence, not the path
   }
 
-  test("FileStreamSource schema: path doesn't exist, with schema") {
-    val userSchema = new StructType().add(new StructField("value", 
IntegerType))
-    val schema = createFileStreamSourceAndGetSchema(
-      format = None, path = Some("/a/b/c"), schema = Some(userSchema))
-    assert(schema === userSchema)
+  test("FileStreamSource schema: path doesn't exist (with schema) should throw 
exception") {
+    withTempDir { dir =>
+      intercept[AnalysisException] {
+        val userSchema = new StructType().add(new StructField("value", 
IntegerType))
+        val schema = createFileStreamSourceAndGetSchema(
+          format = None, path = Some(new File(dir, "1").getAbsolutePath), 
schema = Some(userSchema))
+      }
+    }
   }
 
 
@@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   // =============== Parquet file stream schema tests ================
 
-  ignore("FileStreamSource schema: parquet, no existing files, no schema") {
-    withTempDir { src =>
-      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
-        val e = intercept[AnalysisException] {
-          createFileStreamSourceAndGetSchema(
-            format = Some("parquet"),
-            path = Some(new File(src, "1").getCanonicalPath),
-            schema = None)
-        }
-        assert("Unable to infer schema. It must be specified manually.;" === 
e.getMessage)
-      }
-    }
-  }
-
   test("FileStreamSource schema: parquet, existing files, no schema") {
     withTempDir { src =>
       Seq("a", "b", "c").toDS().as("userColumn").toDF().write


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to