Repository: spark
Updated Branches:
refs/heads/master 191d7cf2a -> 48f42781d
[SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file
as parameter
```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after
this commit #1370
be careful if someone is working on SPARK-2551, make sure the new change passes
test case ```test("Read a parquet file instead of a directory")```
Author: chutium <[email protected]>
Closes #2044 from chutium/parquet-singlefile and squashes the following commits:
4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to
take a single file as parameter
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48f42781
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48f42781
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48f42781
Branch: refs/heads/master
Commit: 48f42781dedecd38ddcb2dcf67dead92bb4318f5
Parents: 191d7cf
Author: chutium <[email protected]>
Authored: Wed Aug 27 13:13:04 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Wed Aug 27 13:13:04 2014 -0700
----------------------------------------------------------------------
.../apache/spark/sql/parquet/ParquetTypes.scala | 7 ++---
.../spark/sql/parquet/ParquetQuerySuite.scala | 27 +++++++++++++++++---
2 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/48f42781/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 1a52377..2941b97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -394,17 +394,14 @@ private[parquet] object ParquetTypesConverter extends
Logging {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet
metadata path $origPath")
}
val path = origPath.makeQualified(fs)
- if (!fs.getFileStatus(path).isDir) {
- throw new IllegalArgumentException(
- s"Expected $path for be a directory with Parquet files/metadata")
- }
- ParquetRelation.enableLogForwarding()
val children = fs.listStatus(path).filterNot { status =>
val name = status.getPath.getName
(name(0) == '.' || name(0) == '_') && name !=
ParquetFileWriter.PARQUET_METADATA_FILE
}
+ ParquetRelation.enableLogForwarding()
+
// NOTE (lian): Parquet "_metadata" file can be very slow if the file
consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we
only need to touch a
// single row group to read schema related metadata. Notice that we are
making assumptions that
http://git-wip-us.apache.org/repos/asf/spark/blob/48f42781/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 4219cc0..42923b6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils
-
case class TestRDDEntry(key: Int, value: String)
case class NullReflectData(
@@ -420,8 +419,30 @@ class ParquetQuerySuite extends QueryTest with
FunSuiteLike with BeforeAndAfterA
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
- assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line
$i")
- assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
+ assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line
$i")
+ assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line
$i")
+ }
+ Utils.deleteRecursively(file)
+ }
+
+ test("Read a parquet file instead of a directory") {
+ val file = getTempFilePath("parquet")
+ val path = file.toString
+ val fsPath = new Path(path)
+ val fs: FileSystem =
fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
+ val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+ rdd.coalesce(1).saveAsParquetFile(path)
+
+ val children =
fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
+ assert(children.length > 0)
+ val readFile = parquetFile(path + "/" + children(0).getPath.getName)
+ readFile.registerTempTable("tmpx")
+ val rdd_copy = sql("SELECT * FROM tmpx").collect()
+ val rdd_orig = rdd.collect()
+ for(i <- 0 to 99) {
+ assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line
$i")
+ assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line
$i")
}
Utils.deleteRecursively(file)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]