Repository: spark
Updated Branches:
  refs/heads/master 86475520f -> d9ca9fd3e


[SPARK-14837][SQL][STREAMING] Added support in file stream source for reading 
new files added to subdirs

## What changes were proposed in this pull request?
Currently, file stream source can only find new files if they appear in the 
directory given to the source, but not if they appear in subdirs. This PR add 
support for providing glob patterns when creating file stream source so that it 
can find new files in nested directories based on the glob pattern.

## How was this patch tested?

Unit test that tests when new files are discovered with globs and partitioned 
directories.

Author: Tathagata Das <[email protected]>

Closes #12616 from tdas/SPARK-14837.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9ca9fd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9ca9fd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9ca9fd3

Branch: refs/heads/master
Commit: d9ca9fd3e582f9d29f8887c095637c93a8b93651
Parents: 8647552
Author: Tathagata Das <[email protected]>
Authored: Tue May 10 16:43:32 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue May 10 16:43:32 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 18 +---
 .../datasources/ListingFileCatalog.scala        |  1 +
 .../PartitioningAwareFileCatalog.scala          |  4 +-
 .../execution/streaming/FileStreamSource.scala  | 40 ++++++---
 .../sql/streaming/FileStreamSourceSuite.scala   | 88 ++++++++++++++++++--
 5 files changed, 114 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca9fd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0342ec5..ce45168 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -174,25 +174,11 @@ case class DataSource(
         s.createSource(sparkSession.wrapped, metadataPath, 
userSpecifiedSchema, className, options)
 
       case format: FileFormat =>
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val path = caseInsensitiveOptions.getOrElse("path", {
+        val path = new CaseInsensitiveMap(options).getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
-
-        def dataFrameBuilder(files: Array[String]): DataFrame = {
-          val newOptions = options.filterKeys(_ != "path") + ("basePath" -> 
path)
-          val newDataSource =
-            DataSource(
-              sparkSession,
-              paths = files,
-              userSpecifiedSchema = Some(sourceInfo.schema),
-              className = className,
-              options = new CaseInsensitiveMap(newOptions))
-          Dataset.ofRows(sparkSession, 
LogicalRelation(newDataSource.resolveRelation()))
-        }
-
         new FileStreamSource(
-          sparkSession, metadataPath, path, sourceInfo.schema, 
dataFrameBuilder)
+          sparkSession, path, className, sourceInfo.schema, metadataPath, 
options)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")

http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca9fd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index bdf43e0..5cee2b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -53,6 +53,7 @@ class ListingFileCatalog(
     if (cachedPartitionSpec == null) {
       cachedPartitionSpec = inferPartitioning()
     }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
     cachedPartitionSpec
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca9fd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 5f04a6c..27f23c8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -49,7 +49,7 @@ abstract class PartitioningAwareFileCatalog(
   protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
 
   override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
-    if (partitionSpec().partitionColumns.isEmpty) {
+    val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
       Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName 
startsWith "_")) :: Nil
     } else {
       prunePartitions(filters, partitionSpec()).map {
@@ -59,6 +59,8 @@ abstract class PartitioningAwareFileCatalog(
             leafDirToChildrenFiles(path).filterNot(_.getPath.getName 
startsWith "_"))
       }
     }
+    logTrace("Selected files after partition pruning:\n\t" + 
selectedPartitions.mkString("\n\t"))
+    selectedPartitions
   }
 
   override def allFiles(): Seq[FileStatus] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca9fd3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 7b4c035..bef5616 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, 
DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.OpenHashSet
 
 /**
@@ -33,12 +35,14 @@ import org.apache.spark.util.collection.OpenHashSet
  */
 class FileStreamSource(
     sparkSession: SparkSession,
-    metadataPath: String,
     path: String,
+    fileFormatClassName: String,
     override val schema: StructType,
-    dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
+    metadataPath: String,
+    options: Map[String, String]) extends Source with Logging {
 
   private val fs = new 
Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
+  private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can 
contains glob patterns
   private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, 
metadataPath)
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
@@ -69,6 +73,7 @@ class FileStreamSource(
     if (newFiles.nonEmpty) {
       maxBatchId += 1
       metadataLog.add(maxBatchId, newFiles)
+      logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} 
new files")
     }
 
     new LongOffset(maxBatchId)
@@ -97,21 +102,30 @@ class FileStreamSource(
     assert(startId <= endId)
     val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
     logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
-    logDebug(s"Streaming ${files.mkString(", ")}")
-    dataFrameBuilder(files)
+    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
+    val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
+    val newDataSource =
+      DataSource(
+        sparkSession,
+        paths = files,
+        userSpecifiedSchema = Some(schema),
+        className = fileFormatClassName,
+        options = newOptions)
+    Dataset.ofRows(sparkSession, 
LogicalRelation(newDataSource.resolveRelation()))
   }
 
   private def fetchAllFiles(): Seq[String] = {
-    val startTime = System.nanoTime()
-    val files = fs.listStatus(new Path(path))
-      .filterNot(_.getPath.getName.startsWith("_"))
-      .map(_.getPath.toUri.toString)
-    val endTime = System.nanoTime()
-    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 
1000000}ms")
+    val startTime = System.nanoTime
+    val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+    val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, 
Some(new StructType))
+    val files = catalog.allFiles().map(_.getPath.toUri.toString)
+    val endTime = System.nanoTime
+    logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 
1000000}ms")
+    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
     files
   }
 
   override def getOffset: Option[Offset] = 
Some(fetchMaxOffset()).filterNot(_.offset == -1)
 
-  override def toString: String = s"FileSource[$path]"
+  override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca9fd3/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 4b95d65..c97304c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 class FileStreamSourceTest extends StreamTest with SharedSQLContext {
@@ -58,7 +58,7 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
         addData(source)
         source.currentOffset + 1
       }
-      logInfo(s"Added data to $source at offset $newOffset")
+      logInfo(s"Added file to $source at offset $newOffset")
       (source, newOffset)
     }
 
@@ -69,8 +69,11 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
     extends AddFileData {
 
     override def addData(source: FileStreamSource): Unit = {
-      val file = Utils.tempFileWith(new File(tmp, "text"))
-      stringToFile(file, content).renameTo(new File(src, file.getName))
+      val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+      val finalFile = new File(src, tempFile.getName)
+      src.mkdirs()
+      require(stringToFile(tempFile, content).renameTo(finalFile))
+      logInfo(s"Written text '$content' to file $finalFile")
     }
   }
 
@@ -89,6 +92,7 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
     def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
       val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
       df.write.parquet(tmpDir.getCanonicalPath)
+      src.mkdirs()
       tmpDir.listFiles().foreach { f =>
         f.renameTo(new File(src, s"${f.getName}"))
       }
@@ -100,7 +104,6 @@ class FileStreamSourceTest extends StreamTest with 
SharedSQLContext {
       format: String,
       path: String,
       schema: Option[StructType] = None): DataFrame = {
-
     val reader =
       if (schema.isDefined) {
         spark.read.format(format).schema(schema.get)
@@ -327,7 +330,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
     }
   }
 
-
   test("reading from json files inside partitioned directory") {
     withTempDirs { case (baseSrc, tmp) =>
       val src = new File(baseSrc, "type=X")
@@ -348,7 +350,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
     }
   }
 
-
   test("reading from json files with changing schema") {
     withTempDirs { case (src, tmp) =>
 
@@ -444,6 +445,79 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
with SharedSQLContext {
     }
   }
 
+  test("read new files in nested directories with globbing") {
+    withTempDirs { case (dir, tmp) =>
+
+      // src/*/* should consider all the files and directories that matches 
that glob.
+      // So any files that matches the glob as well as any files in 
directories that matches
+      // this glob should be read.
+      val fileStream = createFileStream("text", s"${dir.getCanonicalPath}/*/*")
+      val filtered = fileStream.filter($"value" contains "keep")
+      val subDir = new File(dir, "subdir")
+      val subSubDir = new File(subDir, "subsubdir")
+      val subSubSubDir = new File(subSubDir, "subsubsubdir")
+
+      require(!subDir.exists())
+      require(!subSubDir.exists())
+
+      testStream(filtered)(
+        // Create new dir/subdir and write to it, should read
+        AddTextFileData("drop1\nkeep2", subDir, tmp),
+        CheckAnswer("keep2"),
+
+        // Add files to dir/subdir, should read
+        AddTextFileData("keep3", subDir, tmp),
+        CheckAnswer("keep2", "keep3"),
+
+        // Create new dir/subdir/subsubdir and write to it, should read
+        AddTextFileData("keep4", subSubDir, tmp),
+        CheckAnswer("keep2", "keep3", "keep4"),
+
+        // Add files to dir/subdir/subsubdir, should read
+        AddTextFileData("keep5", subSubDir, tmp),
+        CheckAnswer("keep2", "keep3", "keep4", "keep5"),
+
+        // 1. Add file to src dir, should not read as globbing src/*/* does 
not capture files in
+        //    dir, only captures files in dir/subdir/
+        // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not read 
as src/*/* should
+        //    not capture those files
+        AddTextFileData("keep6", dir, tmp),
+        AddTextFileData("keep7", subSubSubDir, tmp),
+        AddTextFileData("keep8", subDir, tmp), // needed to make query detect 
new data
+        CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8")
+      )
+    }
+  }
+
+  test("read new files in partitioned table with globbing, should not read 
partition data") {
+    withTempDirs { case (dir, tmp) =>
+      val partitionFooSubDir = new File(dir, "partition=foo")
+      val partitionBarSubDir = new File(dir, "partition=bar")
+
+      val schema = new StructType().add("value", StringType).add("partition", 
StringType)
+      val fileStream = createFileStream("json", 
s"${dir.getCanonicalPath}/*/*", Some(schema))
+      val filtered = fileStream.filter($"value" contains "keep")
+      val nullStr = null.asInstanceOf[String]
+      testStream(filtered)(
+        // Create new partition=foo sub dir and write to it, should read only 
value, not partition
+        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", 
partitionFooSubDir, tmp),
+        CheckAnswer(("keep2", nullStr)),
+
+        // Append to same partition=1 sub dir, should read only value, not 
partition
+        AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+        CheckAnswer(("keep2", nullStr), ("keep3", nullStr)),
+
+        // Create new partition sub dir and write to it, should read only 
value, not partition
+        AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+        CheckAnswer(("keep2", nullStr), ("keep3", nullStr), ("keep4", 
nullStr)),
+
+        // Append to same partition=2 sub dir, should read only value, not 
partition
+        AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+        CheckAnswer(("keep2", nullStr), ("keep3", nullStr), ("keep4", 
nullStr), ("keep5", nullStr))
+      )
+    }
+  }
+
   test("fault tolerance") {
     withTempDirs { case (src, tmp) =>
       val fileStream = createFileStream("text", src.getCanonicalPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to