Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5b893bd60 -> 3a41a1327


[SPARK-7155] [CORE] Allow newAPIHadoopFile to support comma-separated list of 
files as input

See JIRA: https://issues.apache.org/jira/browse/SPARK-7155

SparkContext's newAPIHadoopFile() does not support comma-separated list of 
files. For example, the following:
```scala
sc.newAPIHadoopFile("/root/file1.txt,/root/file2.txt", 
classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
```
will throw
```
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does 
not exist: file:/root/file1.txt,/root/file2.txt
```
However, the other API hadoopFile() is able to process comma-separated list of 
files correctly. In addition, since sc.textFile() uses hadoopFile(), it is also 
able to process comma-separated list of files correctly.

That means the behaviors of hadoopFile() and newAPIHadoopFile() are not aligned.

This pull request fix this issue and allows newAPIHadoopFile() to support 
comma-separated list of files as input.

A unit test has also been added in SparkContextSuite.scala. It creates two 
temporary text files as the input and tested against sc.textFile(), 
sc.hadoopFile(), and sc.newAPIHadoopFile().

Note: The contribution is my original work and that I license the work to the 
project under the project's open source license.

Author: yongtang <[email protected]>

Closes #5708 from yongtang/SPARK-7155 and squashes the following commits:

654c80c [yongtang] [SPARK-7155] [CORE] Remove unneeded temp file deletion in 
unit test as parent dir is already temporary.
26faa6a [yongtang] [SPARK-7155] [CORE] Support comma-separated list of files as 
input for newAPIHadoopFile, wholeTextFiles, and binaryFiles. Use setInputPaths 
for consistency.
73e1f16 [yongtang] [SPARK-7155] [CORE] Allow newAPIHadoopFile to support 
comma-separated list of files as input.

(cherry picked from commit 3fc6cfd079d8cdd35574605cb9a4178ca7f2613d)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a41a132
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a41a132
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a41a132

Branch: refs/heads/branch-1.3
Commit: 3a41a1327c9bc9af909f7d5ee8ddf18a21baab05
Parents: 5b893bd
Author: yongtang <[email protected]>
Authored: Wed Apr 29 23:55:51 2015 +0100
Committer: Sean Owen <[email protected]>
Committed: Wed Apr 29 23:56:00 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 12 +++-
 .../org/apache/spark/SparkContextSuite.scala    | 63 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a41a132/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 66426f3..98da676 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -621,7 +621,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   RDD[(String, String)] = {
     assertNotStopped()
     val job = new NewHadoopJob(hadoopConfiguration)
-    NewFileInputFormat.addInputPath(job, new Path(path))
+    // Use setInputPaths so that wholeTextFiles aligns with 
hadoopFile/textFile in taking
+    // comma separated files as input. (see SPARK-7155)
+    NewFileInputFormat.setInputPaths(job, path)
     val updateConf = job.getConfiguration
     new WholeTextFileRDD(
       this,
@@ -667,7 +669,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       RDD[(String, PortableDataStream)] = {
     assertNotStopped()
     val job = new NewHadoopJob(hadoopConfiguration)
-    NewFileInputFormat.addInputPath(job, new Path(path))
+    // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile 
in taking
+    // comma separated files as input. (see SPARK-7155)
+    NewFileInputFormat.setInputPaths(job, path)
     val updateConf = job.getConfiguration
     new BinaryFileRDD(
       this,
@@ -843,7 +847,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     // The call to new NewHadoopJob automatically adds security credentials to 
conf,
     // so we don't need to explicitly add them ourselves
     val job = new NewHadoopJob(conf)
-    NewFileInputFormat.addInputPath(job, new Path(path))
+    // Use setInputPaths so that newAPIHadoopFile aligns with 
hadoopFile/textFile in taking
+    // comma separated files as input. (see SPARK-7155)
+    NewFileInputFormat.setInputPaths(job, path)
     val updatedConf = job.getConfiguration
     new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a41a132/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 26d1019..b4be8e3 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -25,7 +25,9 @@ import com.google.common.io.Files
 
 import org.scalatest.FunSuite
 
-import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => 
NewTextInputFormat}
 import org.apache.spark.util.Utils
 
 import scala.concurrent.Await
@@ -190,4 +192,63 @@ class SparkContextSuite extends FunSuite with 
LocalSparkContext {
       sc.stop()
     }
   }
+
+  test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles 
(SPARK-7155)") {
+    // Regression test for SPARK-7155
+    // dir1 and dir2 are used for wholeTextFiles and binaryFiles
+    val dir1 = Utils.createTempDir()
+    val dir2 = Utils.createTempDir()
+
+    val dirpath1=dir1.getAbsolutePath
+    val dirpath2=dir2.getAbsolutePath
+
+    // file1 and file2 are placed inside dir1, they are also used for
+    // textFile, hadoopFile, and newAPIHadoopFile
+    // file3, file4 and file5 are placed inside dir2, they are used for
+    // textFile, hadoopFile, and newAPIHadoopFile as well
+    val file1 = new File(dir1, "part-00000")
+    val file2 = new File(dir1, "part-00001")
+    val file3 = new File(dir2, "part-00000")
+    val file4 = new File(dir2, "part-00001")
+    val file5 = new File(dir2, "part-00002")
+
+    val filepath1=file1.getAbsolutePath
+    val filepath2=file2.getAbsolutePath
+    val filepath3=file3.getAbsolutePath
+    val filepath4=file4.getAbsolutePath
+    val filepath5=file5.getAbsolutePath
+
+
+    try {
+      // Create 5 text files.
+      Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in 
file1", file1, UTF_8)
+      Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8)
+      Files.write("someline1 in file3", file3, UTF_8)
+      Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8)
+      Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8)
+
+      sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+
+      // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
+      assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
+      assert(sc.hadoopFile(filepath1 + "," + filepath2,
+        classOf[TextInputFormat], classOf[LongWritable], 
classOf[Text]).count() == 5L)
+      assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
+        classOf[NewTextInputFormat], classOf[LongWritable], 
classOf[Text]).count() == 5L)
+
+      // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and 
file5
+      assert(sc.textFile(filepath3 + "," + filepath4 + "," + 
filepath5).count() == 5L)
+      assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+               classOf[TextInputFormat], classOf[LongWritable], 
classOf[Text]).count() == 5L)
+      assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
+               classOf[NewTextInputFormat], classOf[LongWritable], 
classOf[Text]).count() == 5L)
+
+      // Test wholeTextFiles, and binaryFiles for dir1 and dir2
+      assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
+      assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
+
+    } finally {
+      sc.stop()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to