Repository: spark
Updated Branches:
  refs/heads/branch-1.5 54334d378 -> d83dcc9a0


[SPARK-10672] [SQL] Do not fail when we cannot save the metadata of a data 
source table in a hive compatible way

https://issues.apache.org/jira/browse/SPARK-10672

With changes in this PR, we will fallback to same the metadata of a table in 
Spark SQL specific way if we fail to save it in a hive compatible way (Hive 
throws an exception because of its internal restrictions, e.g. binary and 
decimal types cannot be saved to parquet if the metastore is running Hive 
0.13). I manually tested the fix with the following test in 
`DataSourceWithHiveMetastoreCatalogSuite` 
(`spark.sql.hive.metastore.version=0.13` and 
`spark.sql.hive.metastore.jars`=`maven`).

```
    test(s"fail to save metadata of a parquet table in hive 0.13") {
      withTempPath { dir =>
        withTable("t") {
          val path = dir.getCanonicalPath

          sql(
            s"""CREATE TABLE t USING $provider
               |OPTIONS (path '$path')
               |AS SELECT 1 AS d1, cast("val_1" as binary) AS d2
             """.stripMargin)

          sql(
            s"""describe formatted t
             """.stripMargin).collect.foreach(println)

          sqlContext.table("t").show
        }
      }
    }
  }
```

Without this fix, we will fail with the following error.
```
org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.UnsupportedOperationException: Unknown field type: binary
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:619)
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:576)
        at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply$mcV$sp(ClientWrapper.scala:359)
        at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357)
        at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$createTable$1.apply(ClientWrapper.scala:357)
        at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:256)
        at 
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:211)
        at 
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:248)
        at 
org.apache.spark.sql.hive.client.ClientWrapper.createTable(ClientWrapper.scala:357)
        at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.createDataSourceTable(HiveMetastoreCatalog.scala:358)
        at 
org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:285)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:58)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:58)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:129)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
        at 
org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56)
        at 
org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:56)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:165)
        at 
org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:150)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTable(HiveMetastoreCatalogSuite.scala:52)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:162)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(HiveMetastoreCatalogSuite.scala:161)
        at 
org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:125)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.withTempPath(HiveMetastoreCatalogSuite.scala:52)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply$mcV$sp(HiveMetastoreCatalogSuite.scala:161)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite$$anonfun$4$$anonfun$apply$1.apply(HiveMetastoreCatalogSuite.scala:161)
        at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
        at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
        at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
        at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
        at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
        at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
        at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
        at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
        at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
        at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
        at org.scalatest.Suite$class.run(Suite.scala:1424)
        at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
        at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
        at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
        at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.org$scalatest$BeforeAndAfterAll$$super$run(HiveMetastoreCatalogSuite.scala:52)
        at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
        at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
        at 
org.apache.spark.sql.hive.DataSourceWithHiveMetastoreCatalogSuite.run(HiveMetastoreCatalogSuite.scala:52)
        at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
        at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
        at sbt.ForkMain$Run$2.call(ForkMain.java:294)
        at sbt.ForkMain$Run$2.call(ForkMain.java:284)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Unknown field type: binary
        at 
org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getObjectInspector(ArrayWritableObjectInspector.java:108)
        at 
org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.<init>(ArrayWritableObjectInspector.java:60)
        at 
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:113)
        at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:339)
        at 
org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:288)
        at 
org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:194)
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:597)
        ... 76 more
```

Author: Yin Huai <[email protected]>

Closes #8824 from yhuai/datasourceMetadata.

(cherry picked from commit 2204cdb28483b249616068085d4e88554fe6acef)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d83dcc9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d83dcc9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d83dcc9a

Branch: refs/heads/branch-1.5
Commit: d83dcc9a0c05c033e2346716a700bfd90b7c6e98
Parents: 54334d3
Author: Yin Huai <[email protected]>
Authored: Tue Sep 22 13:29:39 2015 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue Sep 22 13:29:51 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 101 +++++++++----------
 1 file changed, 50 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d83dcc9a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f22f086..de0c7f7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -309,69 +309,68 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
     }
 
     // TODO: Support persisting partitioned data source relations in Hive 
compatible format
-    val hiveTable = (maybeSerDe, dataSource.relation) match {
+    val qualifiedTableName = tableIdent.quotedString
+    val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) 
match {
       case (Some(serde), relation: HadoopFsRelation)
-          if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
-        // Hive ParquetSerDe doesn't support decimal type until 1.2.0.
-        val isParquetSerDe = 
serde.inputFormat.exists(_.toLowerCase.contains("parquet"))
-        val hasDecimalFields = 
relation.schema.existsRecursively(_.isInstanceOf[DecimalType])
-
-        val hiveParquetSupportsDecimal = client.version match {
-          case org.apache.spark.sql.hive.client.hive.v1_2 => true
-          case _ => false
-        }
-
-        if (isParquetSerDe && !hiveParquetSupportsDecimal && hasDecimalFields) 
{
-          // If Hive version is below 1.2.0, we cannot save Hive compatible 
schema to
-          // metastore when the file format is Parquet and the schema has 
DecimalType.
-          logWarning {
-            "Persisting Parquet relation with decimal field(s) into Hive 
metastore in Spark SQL " +
-              "specific format, which is NOT compatible with Hive. Because 
ParquetHiveSerDe in " +
-              s"Hive ${client.version.fullVersion} doesn't support decimal 
type. See HIVE-6384."
-          }
-          newSparkSQLSpecificMetastoreTable()
-        } else {
-          logInfo {
-            "Persisting data source relation with a single input path into 
Hive metastore in " +
-              s"Hive compatible format. Input path: ${relation.paths.head}"
-          }
-          newHiveCompatibleMetastoreTable(relation, serde)
-        }
+        if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
+        val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
+        val message =
+          s"Persisting data source relation $qualifiedTableName with a single 
input path " +
+            s"into Hive metastore in Hive compatible format. Input path: 
${relation.paths.head}."
+        (Some(hiveTable), message)
 
       case (Some(serde), relation: HadoopFsRelation) if 
relation.partitionColumns.nonEmpty =>
-        logWarning {
-          "Persisting partitioned data source relation into Hive metastore in 
" +
-            s"Spark SQL specific format, which is NOT compatible with Hive.  
Input path(s): " +
-            relation.paths.mkString("\n", "\n", "")
-        }
-        newSparkSQLSpecificMetastoreTable()
+        val message =
+          s"Persisting partitioned data source relation $qualifiedTableName 
into " +
+            "Hive metastore in Spark SQL specific format, which is NOT 
compatible with Hive. " +
+            "Input path(s): " + relation.paths.mkString("\n", "\n", "")
+        (None, message)
 
       case (Some(serde), relation: HadoopFsRelation) =>
-        logWarning {
-          "Persisting data source relation with multiple input paths into Hive 
metastore in " +
-            s"Spark SQL specific format, which is NOT compatible with Hive.  
Input paths: " +
-            relation.paths.mkString("\n", "\n", "")
-        }
-        newSparkSQLSpecificMetastoreTable()
+        val message =
+          s"Persisting data source relation $qualifiedTableName with multiple 
input paths into " +
+            "Hive metastore in Spark SQL specific format, which is NOT 
compatible with Hive. " +
+            s"Input paths: " + relation.paths.mkString("\n", "\n", "")
+        (None, message)
 
       case (Some(serde), _) =>
-        logWarning {
-          s"Data source relation is not a 
${classOf[HadoopFsRelation].getSimpleName}. " +
-            "Persisting it into Hive metastore in Spark SQL specific format, " 
+
-            "which is NOT compatible with Hive."
-        }
-        newSparkSQLSpecificMetastoreTable()
+        val message =
+          s"Data source relation $qualifiedTableName is not a " +
+            s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into 
Hive metastore " +
+            "in Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
 
       case _ =>
-        logWarning {
+        val message =
           s"Couldn't find corresponding Hive SerDe for data source provider 
$provider. " +
-            "Persisting data source relation into Hive metastore in Spark SQL 
specific format, " +
-            "which is NOT compatible with Hive."
-        }
-        newSparkSQLSpecificMetastoreTable()
+            s"Persisting data source relation $qualifiedTableName into Hive 
metastore in " +
+            s"Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
     }
 
-    client.createTable(hiveTable)
+    (hiveCompitiableTable, logMessage) match {
+      case (Some(table), message) =>
+        // We first try to save the metadata of the table in a Hive 
compatiable way.
+        // If Hive throws an error, we fall back to save its metadata in the 
Spark SQL
+        // specific way.
+        try {
+          logInfo(message)
+          client.createTable(table)
+        } catch {
+          case throwable: Throwable =>
+            val warningMessage =
+              s"Could not persist $qualifiedTableName in a Hive compatible 
way. Persisting " +
+                s"it into Hive metastore in Spark SQL specific format."
+            logWarning(warningMessage, throwable)
+            val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
+            client.createTable(sparkSqlSpecificTable)
+        }
+
+      case (None, message) =>
+        logWarning(message)
+        val hiveTable = newSparkSQLSpecificMetastoreTable()
+        client.createTable(hiveTable)
+    }
   }
 
   def hiveDefaultTableFilePath(tableName: String): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to