Repository: spark
Updated Branches:
  refs/heads/branch-1.4 faadbd4d9 -> d0bd68ff8


[SPARK-7868] [SQL] Ignores _temporary directories in HadoopFsRelation

So that potential partial/corrupted data files left by failed tasks/jobs won't 
affect normal data scan.

Author: Cheng Lian <[email protected]>

Closes #6411 from liancheng/spark-7868 and squashes the following commits:

273ea36 [Cheng Lian] Ignores _temporary directories

(cherry picked from commit b463e6d618e69c535297e51f41eca4f91bd33cc8)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0bd68ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0bd68ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0bd68ff

Branch: refs/heads/branch-1.4
Commit: d0bd68ff8a1dcfbff8e6d40573ca049d208ab2de
Parents: faadbd4
Author: Cheng Lian <[email protected]>
Authored: Tue May 26 20:48:56 2015 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue May 26 20:49:05 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   | 20 +++++++++++++-------
 .../sql/sources/hadoopFsRelationSuites.scala    | 16 ++++++++++++++++
 2 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0bd68ff/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index aaabbad..c06026e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
 import org.apache.spark.sql.{Row, _}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
  * ::DeveloperApi::
@@ -378,16 +378,22 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
     def refresh(): Unit = {
+      // We don't filter files/directories whose name start with "_" or "." 
here, as specific data
+      // sources may take advantages over them (e.g. Parquet _metadata and 
_common_metadata files).
+      // But "_temporary" directories are explicitly ignored since failed 
tasks/jobs may leave
+      // partial/corrupted data files there.
       def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): 
Set[FileStatus] = {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
-        val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
-        files.toSet ++ leafDirs ++ dirs.flatMap(dir => 
listLeafFilesAndDirs(fs, dir))
+        if (status.getPath.getName.toLowerCase == "_temporary") {
+          Set.empty
+        } else {
+          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+          val leafDirs = if (dirs.isEmpty) Set(status) else 
Set.empty[FileStatus]
+          files.toSet ++ leafDirs ++ dirs.flatMap(dir => 
listLeafFilesAndDirs(fs, dir))
+        }
       }
 
       leafFiles.clear()
 
-      // We don't filter files/directories like _temporary/_SUCCESS here, as 
specific data sources
-      // may take advantages over them (e.g. Parquet _metadata and 
_common_metadata files).
       val statuses = paths.flatMap { path =>
         val hdfsPath = new Path(path)
         val fs = hdfsPath.getFileSystem(hadoopConf)
@@ -395,7 +401,7 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
         
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs,
 _))
       }
 
-      val (dirs, files) = statuses.partition(_.isDir)
+      val files = statuses.filterNot(_.isDir)
       leafFiles ++= files.map(f => f.getPath -> f).toMap
       leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0bd68ff/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7c02d56..cf5ae88 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
       checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
     }
   }
+
+  test("SPARK-7868: _temporary directories should be ignored") {
+    withTempPath { dir =>
+      val df = Seq("a", "b", "c").zipWithIndex.toDF()
+
+      df.write
+        .format("parquet")
+        .save(dir.getCanonicalPath)
+
+      df.write
+        .format("parquet")
+        .save(s"${dir.getCanonicalPath}/_temporary")
+
+      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), 
df.collect())
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to