Repository: spark
Updated Branches:
  refs/heads/branch-2.0 36f711dc6 -> 1e7d8ba5d


[SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard

## What changes were proposed in this pull request?
make StreamingContext.textFileStream support wildcard
like /home/user/*/file

## How was this patch tested?
I did manual test and added a new unit test case

Author: mwws <[email protected]>
Author: unknown <[email protected]>

Closes #12752 from mwws/SPARK_FileStream.

(cherry picked from commit 33597810ec256cd9bd363bad9239cc6d5b707a6f)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e7d8ba5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e7d8ba5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e7d8ba5

Branch: refs/heads/branch-2.0
Commit: 1e7d8ba5d6212c6e1e57a48f56d68c03c7386e66
Parents: 36f711d
Author: mwws <[email protected]>
Authored: Wed May 11 10:46:58 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Wed May 11 10:47:36 2016 +0100

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    | 10 +++-
 .../spark/streaming/InputStreamsSuite.scala     | 62 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e7d8ba5/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 36f50e0..ed93058 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
       )
       logDebug(s"Getting new files for time $currentTime, " +
         s"ignoring files older than $modTimeIgnoreThreshold")
-      val filter = new PathFilter {
+
+      val newFileFilter = new PathFilter {
         def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
       }
-      val newFiles = fs.listStatus(directoryPath, 
filter).map(_.getPath.toString)
+      val directoryFilter = new PathFilter {
+        override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
+      }
+      val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+      val newFiles = directories.flatMap(dir =>
+        fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
       val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
       logInfo("Finding new files took " + timeTaken + " ms")
       logDebug("# cached file times = " + fileToModTime.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/1e7d8ba5/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6b4c15f..00d506c 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     testFileStream(newFilesOnly = false)
   }
 
+  test("file input stream - wildcard") {
+    var testDir: File = null
+    try {
+      val batchDuration = Seconds(2)
+      testDir = Utils.createTempDir()
+      val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
+      val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
+
+      // Create a file that exists before the StreamingContext is created:
+      val existingFile = new File(testDir, "0")
+      Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+      assert(existingFile.setLastModified(10000) && existingFile.lastModified 
=== 10000)
+
+      val pathWithWildCard = testDir.toString + "/*/"
+
+      // Set up the streaming context and input streams
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+        val batchCounter = new BatchCounter(ssc)
+        // monitor "testDir/*/"
+        val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
+          pathWithWildCard).map(_._2.toString)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+        val outputStream = new TestOutputStream(fileStream, outputQueue)
+        outputStream.register()
+        ssc.start()
+
+        // Advance the clock so that the files are created after 
StreamingContext starts, but
+        // not enough to trigger a batch
+        clock.advance(batchDuration.milliseconds / 2)
+
+        def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
+          val file = new File(testSubDir1, data.toString)
+          Files.write(data + "\n", file, StandardCharsets.UTF_8)
+          assert(file.setLastModified(clock.getTimeMillis()))
+          assert(file.lastModified === clock.getTimeMillis())
+          logInfo("Created file " + file)
+          // Advance the clock after creating the file to avoid a race when
+          // setting its modification time
+          clock.advance(batchDuration.milliseconds)
+          eventually(eventuallyTimeout) {
+            assert(batchCounter.getNumCompletedBatches === data)
+          }
+        }
+        // Over time, create files in the temp directory 1
+        val input1 = Seq(1, 2, 3, 4, 5)
+        input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+
+        // Over time, create files in the temp directory 1
+        val input2 = Seq(6, 7, 8, 9, 10)
+        input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+
+        // Verify that all the files have been read
+        val expectedOutput = (input1 ++ input2).map(_.toString).toSet
+        assert(outputQueue.asScala.flatten.toSet === expectedOutput)
+      }
+    } finally {
+      if (testDir != null) Utils.deleteRecursively(testDir)
+    }
+  }
+
   test("multi-thread receiver") {
     // set up the test receiver
     val numThreads = 10


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to