Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a73201daf -> d8ef0be83


[SPARK-18108][SQL] Fix a schema inconsistent bug that makes a parquet reader 
fail to read data

## What changes were proposed in this pull request?
A vectorized parquet reader fails to read column data if data schema and 
partition schema overlap with each other and inferred types in the partition 
schema differ from ones in the data schema. An example code to reproduce this 
bug is as follows;

```
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
        at 
org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
The root cause is that a logical layer (`HadoopFsRelation`) and a physical 
layer (`VectorizedParquetRecordReader`) have a different assumption on 
partition schema; the logical layer trusts the data schema to infer the type 
the overlapped partition columns, and, on the other hand, the physical layer 
trusts partition schema which is inferred from path string. To fix this bug, 
this pr simply updates `HadoopFsRelation.schema` to respect the partition 
columns position in data schema and respect the partition columns type in 
partition schema.

## How was this patch tested?
Add tests in `ParquetPartitionDiscoverySuite`

Author: Takeshi YAMAMURO <[email protected]>

Closes #16030 from maropu/SPARK-18108.

(cherry picked from commit dc2a4d4ad478fdb0486cc0515d4fe8b402d24db4)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8ef0be8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8ef0be8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8ef0be8

Branch: refs/heads/branch-2.1
Commit: d8ef0be83d8d032ddab79b465226ed3ff3d1eff7
Parents: a73201d
Author: Takeshi YAMAMURO <[email protected]>
Authored: Fri Dec 16 22:44:42 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Dec 16 22:45:32 2016 +0800

----------------------------------------------------------------------
 .../execution/datasources/HadoopFsRelation.scala  | 18 +++++++++++++-----
 .../parquet/ParquetPartitionDiscoverySuite.scala  | 11 +++++++++++
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8ef0be8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 014abd4..9a08524 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 
 
 /**
@@ -49,10 +51,16 @@ case class HadoopFsRelation(
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
   val schema: StructType = {
-    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
-    StructType(dataSchema ++ partitionSchema.filterNot { column =>
-      dataSchemaColumnNames.contains(column.name.toLowerCase)
-    })
+    val getColName: (StructField => String) =
+      if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else 
_.name.toLowerCase
+    val overlappedPartCols = mutable.Map.empty[String, StructField]
+    partitionSchema.foreach { partitionField =>
+      if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
+        overlappedPartCols += getColName(partitionField) -> partitionField
+      }
+    }
+    StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), 
f)) ++
+      partitionSchema.filterNot(f => 
overlappedPartCols.contains(getColName(f))))
   }
 
   def partitionSchemaOption: Option[StructType] =

http://git-wip-us.apache.org/repos/asf/spark/blob/d8ef0be8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 22e35a1..f433a74 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -969,4 +969,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       ))
     }
   }
+
+  test("SPARK-18108 Parquet reader fails when data column types conflict with 
partition ones") {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        val df = Seq((1L, 2.0)).toDF("a", "b")
+        df.write.parquet(s"$path/a=1")
+        checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1, 2.0)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to