Repository: spark
Updated Branches:
  refs/heads/master bec938f77 -> 7ff16e8ab


http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 10d0ede..3bbc5b0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with 
ParquetTest {
         }.flatten.reduceOption(_ && _)
 
         val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
-          case PhysicalOperation(_, filters, LogicalRelation(_: 
ParquetRelation2)) => filters
+          case PhysicalOperation(_, filters, LogicalRelation(_: 
FSBasedParquetRelation)) => filters
         }.flatten.reduceOption(_ && _)
 
         forParquetTableScan.orElse(forParquetDataSource)
@@ -350,7 +350,7 @@ class ParquetDataSourceOffFilterSuite extends 
ParquetFilterSuiteBase with Before
   override protected def afterAll(): Unit = {
     sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
   }
-  
+
   test("SPARK-6742: don't push down predicates which reference partition 
columns") {
     import sqlContext.implicits._
 
@@ -365,7 +365,7 @@ class ParquetDataSourceOffFilterSuite extends 
ParquetFilterSuiteBase with Before
           path,
           Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
           Seq(AttributeReference("part", IntegerType, false)()) ))
-       
+
         checkAnswer(
           df.filter("a = 1 or part = 1"),
           (1 to 3).map(i => Row(1, i, i.toString)))

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index b504842..7c371db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -119,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest 
{
     }
 
     // Decimals with precision above 18 are not yet supported
-    intercept[RuntimeException] {
+    intercept[Throwable] {
       withTempPath { dir =>
         makeDecimalRDD(DecimalType(19, 
10)).saveAsParquetFile(dir.getCanonicalPath)
         parquetFile(dir.getCanonicalPath).collect()
@@ -127,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest 
{
     }
 
     // Unlimited-length decimals are not yet supported
-    intercept[RuntimeException] {
+    intercept[Throwable] {
       withTempPath { dir =>
         
makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
         parquetFile(dir.getCanonicalPath).collect()
@@ -419,7 +419,7 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase 
with BeforeAndAfterA
   test("SPARK-6330 regression test") {
     // In 1.3.0, save to fs other than file: without configuring core-site.xml 
would get:
     // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
-    intercept[java.io.FileNotFoundException] {
+    intercept[Throwable] {
       sqlContext.parquetFile("file:///nonexistent")
     }
     val errorMessage = intercept[Throwable] {

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index bea568e..138e197 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -39,7 +39,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest {
   import sqlContext._
   import sqlContext.implicits._
 
-  val defaultPartitionName = "__NULL__"
+  val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
 
   test("column type inference") {
     def check(raw: String, literal: Literal): Unit = {
@@ -252,9 +252,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest {
 
       val parquetRelation = load(
         "org.apache.spark.sql.parquet",
-        Map(
-          "path" -> base.getCanonicalPath,
-          ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
+        Map("path" -> base.getCanonicalPath))
 
       parquetRelation.registerTempTable("t")
 
@@ -297,9 +295,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest {
 
       val parquetRelation = load(
         "org.apache.spark.sql.parquet",
-        Map(
-          "path" -> base.getCanonicalPath,
-          ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName))
+        Map("path" -> base.getCanonicalPath))
 
       parquetRelation.registerTempTable("t")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index c964b6d..fc90e3e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
         StructField("lowerCase", StringType),
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      FSBasedParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("lowercase", StringType),
           StructField("uppercase", DoubleType, nullable = false))),
@@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
       StructType(Seq(
         StructField("UPPERCase", DoubleType, nullable = false)))) {
 
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      FSBasedParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false))),
 
@@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
 
     // Metastore schema contains additional non-nullable fields.
     assert(intercept[Throwable] {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      FSBasedParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("uppercase", DoubleType, nullable = false),
           StructField("lowerCase", BinaryType, nullable = false))),
@@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
 
     // Conflicting non-nullable field names
     intercept[Throwable] {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      FSBasedParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(StructField("lower", StringType, nullable = false))),
         StructType(Seq(StructField("lowerCase", BinaryType))))
     }
@@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
         StructField("firstField", StringType, nullable = true),
         StructField("secondField", StringType, nullable = true),
         StructField("thirdfield", StringType, nullable = true)))) {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      FSBasedParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),
@@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
     // Merge should fail if the Metastore contains any additional fields that 
are not
     // nullable.
     assert(intercept[Throwable] {
-      ParquetRelation2.mergeMetastoreParquetSchema(
+      FSBasedParquetRelation.mergeMetastoreParquetSchema(
         StructType(Seq(
           StructField("firstfield", StringType, nullable = true),
           StructField("secondfield", StringType, nullable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d754c8e..b0e82c8 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, 
LogicalRelation, Partition => ParquetPartition, PartitionSpec, 
ResolvedDataSource}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
 import org.apache.spark.util.Utils
 
 /* Implicit conversions */
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
     // serialize the Metastore schema to JSON and pass it as a data source 
option because of the
     // evil case insensitivity issue, which is reconciled within 
`ParquetRelation2`.
     val parquetOptions = Map(
-      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
-      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+      FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
+      FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
     val tableIdentifier =
       QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
 
@@ -238,13 +238,15 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
+        case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) 
=>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
             parquetRelation.paths.toSet == pathsInMetastore.toSet &&
             logical.schema.sameType(metastoreSchema) &&
-            parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+            parquetRelation.partitionSpec == 
partitionSpecInMetastore.getOrElse {
+              PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
+            }
 
           if (useCached) {
             Some(logical)
@@ -256,7 +258,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         case other =>
           logWarning(
             s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
should be stored " +
-              s"as Parquet. However, we are getting a ${other} from the 
metastore cache. " +
+              s"as Parquet. However, we are getting a $other from the 
metastore cache. " +
               s"This cached entry will be invalidated.")
           cachedDataSourceTables.invalidate(tableIdentifier)
           None
@@ -278,8 +280,9 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
 
       val cached = getCached(tableIdentifier, paths, metastoreSchema, 
Some(partitionSpec))
       val parquetRelation = cached.getOrElse {
-        val created =
-          LogicalRelation(ParquetRelation2(paths, parquetOptions, None, 
Some(partitionSpec))(hive))
+        val created = LogicalRelation(
+          new FSBasedParquetRelation(
+            paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }
@@ -290,8 +293,8 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
 
       val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
       val parquetRelation = cached.getOrElse {
-        val created =
-          LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+        val created = LogicalRelation(
+          new FSBasedParquetRelation(paths.toArray, None, None, 
parquetOptions)(hive))
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 47c60f6..da5d203 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -21,21 +21,18 @@ import java.io.File
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.scalatest.BeforeAndAfterEach
-
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.mapred.InvalidInputException
+import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql._
-import org.apache.spark.util.Utils
-import org.apache.spark.sql.types._
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
 import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * Tests for persisting tables created though the data sources API into the 
metastore.
@@ -582,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
       )
 
       table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(p: ParquetRelation2) => // OK
+        case LogicalRelation(p: FSBasedParquetRelation) => // OK
         case _ =>
           fail(
             "test_parquet_ctas should be converted to " +
-            s"${classOf[ParquetRelation2].getCanonicalName}")
+            s"${classOf[FSBasedParquetRelation].getCanonicalName}")
       }
 
       // Clenup and reset confs.

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a5744cc..1d6393a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -19,16 +19,14 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.errors.DialectException
-import org.apache.spark.sql.DefaultParserDialect
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
-import org.apache.spark.sql.hive.MetastoreRelation
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim}
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
+import org.apache.spark.sql.parquet.FSBasedParquetRelation
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, 
QueryTest, Row, SQLConf}
 
 case class Nested1(f1: Nested2)
 case class Nested2(f2: Nested3)
@@ -176,17 +174,17 @@ class SQLQuerySuite extends QueryTest {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = 
{
       val relation = 
EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
       relation match {
-        case LogicalRelation(r: ParquetRelation2) =>
+        case LogicalRelation(r: FSBasedParquetRelation) =>
           if (!isDataSourceParquet) {
             fail(
               s"${classOf[MetastoreRelation].getCanonicalName} is expected, 
but found " +
-              s"${ParquetRelation2.getClass.getCanonicalName}.")
+              s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
           }
 
         case r: MetastoreRelation =>
           if (isDataSourceParquet) {
             fail(
-              s"${ParquetRelation2.getClass.getCanonicalName} is expected, but 
found " +
+              s"${FSBasedParquetRelation.getClass.getCanonicalName} is 
expected, but found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
           }
       }
@@ -596,7 +594,7 @@ class SQLQuerySuite extends QueryTest {
       sql(s"DROP TABLE $tableName")
     }
   }
-  
+
   test("SPARK-5203 union with different decimal precision") {
     Seq.empty[(Decimal, Decimal)]
       .toDF("d1", "d2")

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index bf1121d..41bcbe8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,16 +21,15 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.{QueryTest, SQLConf}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
-import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, 
InsertIntoFSBasedRelation, LogicalRelation}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
 import org.apache.spark.util.Utils
 
 // The data where the partitioning key exists only in the directory structure.
@@ -292,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
     )
 
     table("test_parquet_ctas").queryExecution.optimizedPlan match {
-      case LogicalRelation(p: ParquetRelation2) => // OK
-      case _ =>
-        fail(
-          s"test_parquet_ctas should be converted to 
${classOf[ParquetRelation2].getCanonicalName}")
+      case LogicalRelation(_: FSBasedParquetRelation) => // OK
+      case _ => fail(
+        "test_parquet_ctas should be converted to " +
+          s"${classOf[FSBasedParquetRelation].getCanonicalName}")
     }
 
     sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -316,12 +315,10 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(
-        InsertIntoDataSource(
-          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case ExecutedCommand(InsertIntoFSBasedRelation(_: 
FSBasedParquetRelation, _, _, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
-        s"${classOf[ParquetRelation2].getCanonicalName} and " +
-        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
+        s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan. " +
         s"However, found a ${o.toString} ")
     }
 
@@ -348,11 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(
-        InsertIntoDataSource(
-          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case ExecutedCommand(InsertIntoFSBasedRelation(r: 
FSBasedParquetRelation, _, _, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
-        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
         s"However, found a ${o.toString} ")
     }
@@ -383,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
 
     assertResult(2) {
       analyzed.collect {
-        case r @ LogicalRelation(_: ParquetRelation2) => r
+        case r @ LogicalRelation(_: FSBasedParquetRelation) => r
       }.size
     }
 
@@ -395,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
       // Converted test_parquet should be cached.
       catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
         case null => fail("Converted test_parquet should be cached in the 
cache.")
-        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => 
// OK
+        case logical @ LogicalRelation(parquetRelation: 
FSBasedParquetRelation) => // OK
         case other =>
           fail(
             "The cached test_parquet should be a Parquet Relation. " +
@@ -693,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends 
ParquetSourceSuiteBase {
 
     val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
     val df2 = df.as('x).join(df.as('y), $"x.str" === 
$"y.str").groupBy("y.str").max("y.int")
-    intercept[RuntimeException](df2.saveAsParquetFile(filePath))
+    intercept[Throwable](df2.saveAsParquetFile(filePath))
 
     val df3 = df2.toDF("str", "max_int")
     df3.saveAsParquetFile(filePath2)

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
deleted file mode 100644
index e8b48a0..0000000
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetTest
-import org.apache.spark.sql.types._
-
-// TODO Don't extend ParquetTest
-// This test suite extends ParquetTest for some convenient utility methods. 
These methods should be
-// moved to some more general places, maybe QueryTest.
-class FSBasedRelationSuite extends QueryTest with ParquetTest {
-  override val sqlContext: SQLContext = TestHive
-
-  import sqlContext._
-  import sqlContext.implicits._
-
-  val dataSchema =
-    StructType(
-      Seq(
-        StructField("a", IntegerType, nullable = false),
-        StructField("b", StringType, nullable = false)))
-
-  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
-
-  val partitionedTestDF1 = (for {
-    i <- 1 to 3
-    p2 <- Seq("foo", "bar")
-  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
-
-  val partitionedTestDF2 = (for {
-    i <- 1 to 3
-    p2 <- Seq("foo", "bar")
-  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
-
-  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
-
-  def checkQueries(df: DataFrame): Unit = {
-    // Selects everything
-    checkAnswer(
-      df,
-      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
-
-    // Simple filtering and partition pruning
-    checkAnswer(
-      df.filter('a > 1 && 'p1 === 2),
-      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, 
p2))
-
-    // Simple projection and filtering
-    checkAnswer(
-      df.filter('a > 1).select('b, 'a + 1),
-      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield 
Row(s"val_$i", i + 1))
-
-    // Simple projection and partition pruning
-    checkAnswer(
-      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
-      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
-
-    // Self-join
-    df.registerTempTable("t")
-    withTempTable("t") {
-      checkAnswer(
-        sql(
-          """SELECT l.a, r.b, l.p1, r.p2
-            |FROM t l JOIN t r
-            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
-          """.stripMargin),
-        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Overwrite") {
-    withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite)
-
-      checkAnswer(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        testDF.collect())
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Append") {
-    withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Append)
-
-      checkAnswer(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)).orderBy("a"),
-        testDF.unionAll(testDF).orderBy("a").collect())
-    }
-  }
-
-  test("save()/load() - non-partitioned table - ErrorIfExists") {
-    withTempDir { file =>
-      intercept[RuntimeException] {
-        testDF.save(
-          path = file.getCanonicalPath,
-          source = classOf[SimpleTextSource].getCanonicalName,
-          mode = SaveMode.ErrorIfExists)
-      }
-    }
-  }
-
-  test("save()/load() - non-partitioned table - Ignore") {
-    withTempDir { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Ignore)
-
-      val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-      assert(fs.listStatus(path).isEmpty)
-    }
-  }
-
-  test("save()/load() - partitioned table - simple queries") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.ErrorIfExists,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkQueries(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)))
-    }
-  }
-
-  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
-    withTempDir { file =>
-      val basePath = new Path(file.getCanonicalPath)
-      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
-      val qualifiedBasePath = fs.makeQualified(basePath)
-
-      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
-        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
-        sparkContext
-          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
-          .saveAsTextFile(partitionDir.toString)
-      }
-
-      val dataSchemaWithPartition =
-        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
-
-      checkQueries(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchemaWithPartition.json)))
-    }
-  }
-
-  test("save()/load() - partitioned table - Overwrite") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - Append") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.unionAll(partitionedTestDF).collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - Append - new partition values") {
-    withTempPath { file =>
-      partitionedTestDF1.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF2.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      checkAnswer(
-        load(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
-        partitionedTestDF.collect())
-    }
-  }
-
-  test("save()/load() - partitioned table - ErrorIfExists") {
-    withTempDir { file =>
-      intercept[RuntimeException] {
-        partitionedTestDF.save(
-          source = classOf[SimpleTextSource].getCanonicalName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("path" -> file.getCanonicalPath),
-          partitionColumns = Seq("p1", "p2"))
-      }
-    }
-  }
-
-  test("save()/load() - partitioned table - Ignore") {
-    withTempDir { file =>
-      partitionedTestDF.save(
-        path = file.getCanonicalPath,
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Ignore)
-
-      val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
-      assert(fs.listStatus(path).isEmpty)
-    }
-  }
-
-  def withTable(tableName: String)(f: => Unit): Unit = {
-    try f finally sql(s"DROP TABLE $tableName")
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
-
-    withTable("t") {
-      checkAnswer(table("t"), testDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Append") {
-    testDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite)
-
-    testDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Append)
-
-    withTable("t") {
-      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      intercept[AnalysisException] {
-        testDF.saveAsTable(
-          tableName = "t",
-          source = classOf[SimpleTextSource].getCanonicalName,
-          mode = SaveMode.ErrorIfExists)
-      }
-    }
-  }
-
-  test("saveAsTable()/load() - non-partitioned table - Ignore") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      testDF.saveAsTable(
-        tableName = "t",
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Ignore)
-
-      assert(table("t").collect().isEmpty)
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - simple queries") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      Map("dataSchema" -> dataSchema.json))
-
-    withTable("t") {
-      checkQueries(table("t"))
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Overwrite") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), 
partitionedTestDF.unionAll(partitionedTestDF).collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append - new partition 
values") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF2.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    withTable("t") {
-      checkAnswer(table("t"), partitionedTestDF.collect())
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Append - mismatched 
partition columns") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = classOf[SimpleTextSource].getCanonicalName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    // Using only a subset of all partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1"))
-    }
-
-    // Using different order of partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p2", "p1"))
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      intercept[AnalysisException] {
-        partitionedTestDF.saveAsTable(
-          tableName = "t",
-          source = classOf[SimpleTextSource].getCanonicalName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("dataSchema" -> dataSchema.json),
-          partitionColumns = Seq("p1", "p2"))
-      }
-    }
-  }
-
-  test("saveAsTable()/load() - partitioned table - Ignore") {
-    Seq.empty[(Int, String)].toDF().registerTempTable("t")
-
-    withTempTable("t") {
-      partitionedTestDF.saveAsTable(
-        tableName = "t",
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Ignore,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1", "p2"))
-
-      assert(table("t").collect().isEmpty)
-    }
-  }
-
-  test("Hadoop style globbing") {
-    withTempPath { file =>
-      partitionedTestDF.save(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      val df = load(
-        source = classOf[SimpleTextSource].getCanonicalName,
-        options = Map(
-          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
-          "dataSchema" -> dataSchema.json))
-
-      val expectedPaths = Set(
-        s"${file.getCanonicalFile}/p1=1/p2=foo",
-        s"${file.getCanonicalFile}/p1=2/p2=foo",
-        s"${file.getCanonicalFile}/p1=1/p2=bar",
-        s"${file.getCanonicalFile}/p1=2/p2=bar"
-      ).map { p =>
-        val path = new Path(p)
-        val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
-      }
-
-      val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: FSBasedRelation) =>
-          relation.paths.toSet
-      }.getOrElse {
-        fail("Expect an FSBasedRelation, but none could be found")
-      }
-
-      assert(actualPaths === expectedPaths)
-      checkAnswer(df, partitionedTestDF.collect())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7ff16e8a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
new file mode 100644
index 0000000..394833f
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.types._
+
+// TODO Don't extend ParquetTest
+// This test suite extends ParquetTest for some convenient utility methods. 
These methods should be
+// moved to some more general places, maybe QueryTest.
+class FSBasedRelationTest extends QueryTest with ParquetTest {
+  override val sqlContext: SQLContext = TestHive
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+
+  val dataSchema =
+    StructType(
+      Seq(
+        StructField("a", IntegerType, nullable = false),
+        StructField("b", StringType, nullable = false)))
+
+  val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
+
+  val partitionedTestDF1 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF2 = (for {
+    i <- 1 to 3
+    p2 <- Seq("foo", "bar")
+  } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2")
+
+  val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2)
+
+  def checkQueries(df: DataFrame): Unit = {
+    // Selects everything
+    checkAnswer(
+      df,
+      for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
+
+    // Simple filtering and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 === 2),
+      for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, 
p2))
+
+    // Simple projection and filtering
+    checkAnswer(
+      df.filter('a > 1).select('b, 'a + 1),
+      for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield 
Row(s"val_$i", i + 1))
+
+    // Simple projection and partition pruning
+    checkAnswer(
+      df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
+      for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
+
+    // Self-join
+    df.registerTempTable("t")
+    withTempTable("t") {
+      checkAnswer(
+        sql(
+          """SELECT l.a, r.b, l.p1, r.p2
+            |FROM t l JOIN t r
+            |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2
+          """.stripMargin),
+        for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, 
s"val_$i", p1, p2))
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Overwrite") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        testDF.collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Append") {
+    withTempPath { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Overwrite)
+
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Append)
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)).orderBy("a"),
+        testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("save()/load() - non-partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        testDF.save(
+          path = file.getCanonicalPath,
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("save()/load() - non-partitioned table - Ignore") {
+    withTempDir { file =>
+      testDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  test("save()/load() - partitioned table - simple queries") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.ErrorIfExists,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)))
+    }
+  }
+
+  test("save()/load() - partitioned table - Overwrite") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - Append - new partition values") {
+    withTempPath { file =>
+      partitionedTestDF1.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      partitionedTestDF2.save(
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      checkAnswer(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchema.json)),
+        partitionedTestDF.collect())
+    }
+  }
+
+  test("save()/load() - partitioned table - ErrorIfExists") {
+    withTempDir { file =>
+      intercept[RuntimeException] {
+        partitionedTestDF.save(
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("path" -> file.getCanonicalPath),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("save()/load() - partitioned table - Ignore") {
+    withTempDir { file =>
+      partitionedTestDF.save(
+        path = file.getCanonicalPath,
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      val path = new Path(file.getCanonicalPath)
+      val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
+      assert(fs.listStatus(path).isEmpty)
+    }
+  }
+
+  def withTable(tableName: String)(f: => Unit): Unit = {
+    try f finally sql(s"DROP TABLE $tableName")
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Overwrite") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Append") {
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite)
+
+    testDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append)
+
+    withTable("t") {
+      checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        testDF.saveAsTable(
+          tableName = "t",
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists)
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - non-partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      testDF.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Ignore)
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - simple queries") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      Map("dataSchema" -> dataSchema.json))
+
+    withTable("t") {
+      checkQueries(table("t"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Overwrite") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append") {
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), 
partitionedTestDF.unionAll(partitionedTestDF).collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - new partition 
values") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    partitionedTestDF2.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Append,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    withTable("t") {
+      checkAnswer(table("t"), partitionedTestDF.collect())
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Append - mismatched 
partition columns") {
+    partitionedTestDF1.saveAsTable(
+      tableName = "t",
+      source = dataSourceName,
+      mode = SaveMode.Overwrite,
+      options = Map("dataSchema" -> dataSchema.json),
+      partitionColumns = Seq("p1", "p2"))
+
+    // Using only a subset of all partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1"))
+    }
+
+    // Using different order of partition columns
+    intercept[Throwable] {
+      partitionedTestDF2.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Append,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p2", "p1"))
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - ErrorIfExists") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      intercept[AnalysisException] {
+        partitionedTestDF.saveAsTable(
+          tableName = "t",
+          source = dataSourceName,
+          mode = SaveMode.ErrorIfExists,
+          options = Map("dataSchema" -> dataSchema.json),
+          partitionColumns = Seq("p1", "p2"))
+      }
+    }
+  }
+
+  test("saveAsTable()/load() - partitioned table - Ignore") {
+    Seq.empty[(Int, String)].toDF().registerTempTable("t")
+
+    withTempTable("t") {
+      partitionedTestDF.saveAsTable(
+        tableName = "t",
+        source = dataSourceName,
+        mode = SaveMode.Ignore,
+        options = Map("dataSchema" -> dataSchema.json),
+        partitionColumns = Seq("p1", "p2"))
+
+      assert(table("t").collect().isEmpty)
+    }
+  }
+
+  test("Hadoop style globbing") {
+    withTempPath { file =>
+      partitionedTestDF.save(
+        source = dataSourceName,
+        mode = SaveMode.Overwrite,
+        options = Map("path" -> file.getCanonicalPath),
+        partitionColumns = Seq("p1", "p2"))
+
+      val df = load(
+        source = dataSourceName,
+        options = Map(
+          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
+          "dataSchema" -> dataSchema.json))
+
+      val expectedPaths = Set(
+        s"${file.getCanonicalFile}/p1=1/p2=foo",
+        s"${file.getCanonicalFile}/p1=2/p2=foo",
+        s"${file.getCanonicalFile}/p1=1/p2=bar",
+        s"${file.getCanonicalFile}/p1=2/p2=bar"
+      ).map { p =>
+        val path = new Path(p)
+        val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
+      }
+
+      val actualPaths = df.queryExecution.analyzed.collectFirst {
+        case LogicalRelation(relation: FSBasedRelation) =>
+          relation.paths.toSet
+      }.getOrElse {
+        fail("Expect an FSBasedRelation, but none could be found")
+      }
+
+      assert(actualPaths === expectedPaths)
+      checkAnswer(df, partitionedTestDF.collect())
+    }
+  }
+}
+
+class SimpleTextRelationSuite extends FSBasedRelationTest {
+  override val dataSourceName: String = 
classOf[SimpleTextSource].getCanonicalName
+
+  import sqlContext._
+
+  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+          .saveAsTextFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+}
+
+class FSBasedParquetRelationSuite extends FSBasedRelationTest {
+  override val dataSourceName: String = 
classOf[parquet.DefaultSource].getCanonicalName
+
+  import sqlContext._
+  import sqlContext.implicits._
+
+  test("save()/load() - partitioned table - simple queries - partition columns 
in data") {
+    withTempDir { file =>
+      val basePath = new Path(file.getCanonicalPath)
+      val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+      val qualifiedBasePath = fs.makeQualified(basePath)
+
+      for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+        val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+        sparkContext
+          .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
+          .toDF("a", "b", "p1")
+          .saveAsParquetFile(partitionDir.toString)
+      }
+
+      val dataSchemaWithPartition =
+        StructType(dataSchema.fields :+ StructField("p1", IntegerType, 
nullable = true))
+
+      checkQueries(
+        load(
+          source = dataSourceName,
+          options = Map(
+            "path" -> file.getCanonicalPath,
+            "dataSchema" -> dataSchemaWithPartition.json)))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to