Repository: spark Updated Branches: refs/heads/branch-2.2 7f8236c93 -> dd69ac620
[SPARK-19611][SQL][FOLLOWUP] set dataSchema correctly in HiveMetastoreCatalog.convertToLogicalRelation We made a mistake in https://github.com/apache/spark/pull/16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`. This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake. N/A Author: Wenchen Fan <[email protected]> Closes #19615 from cloud-fan/infer. (cherry picked from commit 4d9ebf3835dde1abbf9cff29a55675d9f4227620) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd69ac62 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd69ac62 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd69ac62 Branch: refs/heads/branch-2.2 Commit: dd69ac620c5dea38d22ca63488b6fdb430e81da2 Parents: 7f8236c Author: Wenchen Fan <[email protected]> Authored: Tue Oct 31 11:35:32 2017 +0100 Committer: Wenchen Fan <[email protected]> Committed: Tue Oct 31 11:36:52 2017 +0100 ---------------------------------------------------------------------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 22 ++++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dd69ac62/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e1fee9a..f23b27c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -164,13 +164,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - val (dataSchema, updatedTable) = - inferIfNeeded(relation, options, fileFormat, Option(fileIndex)) + val updatedTable = inferIfNeeded(relation, options, fileFormat, Option(fileIndex)) val fsRelation = HadoopFsRelation( location = fileIndex, partitionSchema = partitionSchema, - dataSchema = dataSchema, + dataSchema = updatedTable.dataSchema, // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, fileFormat = fileFormat, @@ -192,13 +191,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormatClass, None) val logicalRelation = cached.getOrElse { - val (dataSchema, updatedTable) = inferIfNeeded(relation, options, fileFormat) + val updatedTable = inferIfNeeded(relation, options, fileFormat) val created = LogicalRelation( DataSource( sparkSession = sparkSession, paths = rootPath.toString :: Nil, - userSpecifiedSchema = Option(dataSchema), + userSpecifiedSchema = Option(updatedTable.dataSchema), // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, options = options, @@ -226,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log relation: HiveTableRelation, options: Map[String, String], fileFormat: FileFormat, - fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { + fileIndexOpt: Option[FileIndex] = None): CatalogTable = { val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase val tableName = relation.tableMeta.identifier.unquotedString @@ -243,21 +242,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log sparkSession, options, fileIndex.listFiles(Nil, Nil).flatMap(_.files)) - .map(mergeWithMetastoreSchema(relation.tableMeta.schema, _)) + .map(mergeWithMetastoreSchema(relation.tableMeta.dataSchema, _)) inferredSchema match { - case Some(schema) => + case Some(dataSchema) => + val schema = StructType(dataSchema ++ relation.tableMeta.partitionSchema) if (inferenceMode == INFER_AND_SAVE) { updateCatalogSchema(relation.tableMeta.identifier, schema) } - (schema, relation.tableMeta.copy(schema = schema)) + relation.tableMeta.copy(schema = schema) case None => logWarning(s"Unable to infer schema for table $tableName from file format " + s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") - (relation.tableMeta.schema, relation.tableMeta) + relation.tableMeta } } else { - (relation.tableMeta.schema, relation.tableMeta) + relation.tableMeta } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
