Repository: spark
Updated Branches:
  refs/heads/master 03668348e -> 0c33c7b4a


[SPARK-7858] [SQL] Use output schema, not relation schema, for data source 
input conversion

In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the 
target schema for converting incoming rows into Catalyst rows.  However, we 
should be using the output schema instead, since our scan might return a subset 
of the relation's columns.

This patch incorporates #6414 by liancheng, which fixes an issue in 
`SimpleTestRelation` that prevented this bug from being caught by our old tests:

> In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating 
> that values produced by this testing relation should be of Scala types, and 
> need to be converted to Catalyst types when necessary. However, we also used 
> `Cast` to convert strings to expected data types. And `Cast` always produces 
> values of Catalyst types, thus no conversion is done at all. This PR makes 
> `SimpleTextRelation` produce Scala values so that data conversion code paths 
> can be properly tested.

Closes #5986.

Author: Josh Rosen <[email protected]>
Author: Cheng Lian <[email protected]>
Author: Cheng Lian <[email protected]>

Closes #6400 from JoshRosen/SPARK-7858 and squashes the following commits:

e71c866 [Josh Rosen] Re-fix bug so that the tests pass again
56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites
2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator
6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion.
5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858
8ba195c [Cheng Lian] Merge 9968fba9979287aaa1f141ba18bfb9d4c116a3b3 into 
61664732b25b35f94be35a42cde651cbfd0e02b7
9968fba [Cheng Lian] Tests the data type conversion code paths


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c33c7b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c33c7b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c33c7b4

Branch: refs/heads/master
Commit: 0c33c7b4a66e47f6246f1b7f2b96f2c33126ec63
Parents: 0366834
Author: Josh Rosen <[email protected]>
Authored: Tue May 26 20:24:35 2015 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue May 26 20:24:35 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 +-
 .../spark/sql/execution/ExistingRDD.scala       | 62 ++++++++------------
 .../spark/sql/sources/DataSourceStrategy.scala  |  2 +-
 .../spark/sql/sources/SimpleTextRelation.scala  |  6 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |  6 ++
 5 files changed, 37 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c33c7b4/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1ea596d..3935f7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -392,7 +392,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     SparkPlan.currentContext.set(self)
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributeSeq = schema.toAttributes
-    val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
+    val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
     DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c33c7b4/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index a500269..f931dc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -21,9 +21,9 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{Row, SQLContext}
 
 /**
@@ -31,26 +31,19 @@ import org.apache.spark.sql.{Row, SQLContext}
  */
 @DeveloperApi
 object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], schema: StructType): 
RDD[Row] = {
+  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): 
RDD[Row] = {
     data.mapPartitions { iterator =>
-      if (iterator.isEmpty) {
-        Iterator.empty
-      } else {
-        val bufferedIterator = iterator.buffered
-        val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
-        val schemaFields = schema.fields.toArray
-        val converters = schemaFields.map {
-          f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
-        }
-        bufferedIterator.map { r =>
-          var i = 0
-          while (i < mutableRow.length) {
-            mutableRow(i) = converters(i)(r.productElement(i))
-            i += 1
-          }
-
-          mutableRow
+      val numColumns = outputTypes.length
+      val mutableRow = new GenericMutableRow(numColumns)
+      val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+      iterator.map { r =>
+        var i = 0
+        while (i < numColumns) {
+          mutableRow(i) = converters(i)(r.productElement(i))
+          i += 1
         }
+
+        mutableRow
       }
     }
   }
@@ -58,26 +51,19 @@ object RDDConversions {
   /**
    * Convert the objects inside Row into the types Catalyst expected.
    */
-  def rowToRowRdd(data: RDD[Row], schema: StructType): RDD[Row] = {
+  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
     data.mapPartitions { iterator =>
-      if (iterator.isEmpty) {
-        Iterator.empty
-      } else {
-        val bufferedIterator = iterator.buffered
-        val mutableRow = new 
GenericMutableRow(bufferedIterator.head.toSeq.toArray)
-        val schemaFields = schema.fields.toArray
-        val converters = schemaFields.map {
-          f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
-        }
-        bufferedIterator.map { r =>
-          var i = 0
-          while (i < mutableRow.length) {
-            mutableRow(i) = converters(i)(r(i))
-            i += 1
-          }
-
-          mutableRow
+      val numColumns = outputTypes.length
+      val mutableRow = new GenericMutableRow(numColumns)
+      val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+      iterator.map { r =>
+        var i = 0
+        while (i < numColumns) {
+          mutableRow(i) = converters(i)(r(i))
+          i += 1
         }
+
+        mutableRow
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0c33c7b4/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index dacd967..c6a4dab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       output: Seq[Attribute],
       rdd: RDD[Row]): SparkPlan = {
     val converted = if (relation.needConversion) {
-      execution.RDDConversions.rowToRowRdd(rdd, relation.schema)
+      execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
     } else {
       rdd
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0c33c7b4/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index de90784..0f959b3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Row, SQLContext}
@@ -108,7 +109,10 @@ class SimpleTextRelation(
 
     sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { 
record =>
       Row(record.split(",").zip(fields).map { case (value, dataType) =>
-        Cast(Literal(value), dataType).eval()
+        // `Cast`ed values are always of Catalyst types (i.e. UTF8String 
instead of String, etc.)
+        val catalystValue = Cast(Literal(value), dataType).eval()
+        // Here we're converting Catalyst values to Scala values to test 
`needsConversion`
+        CatalystTypeConverters.convertToScala(catalystValue, dataType)
       }: _*)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0c33c7b4/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 70328e1..7c02d56 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -76,6 +76,12 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
       df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
       for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
 
+    // Project many copies of columns with different types (reproduction for 
SPARK-7858)
+    checkAnswer(
+      df.filter('a > 1 && 'p1 < 2).select('b, 'b, 'b, 'b, 'p1, 'p1, 'p1, 'p1),
+      for (i <- 2 to 3; _ <- Seq("foo", "bar"))
+        yield Row(s"val_$i", s"val_$i", s"val_$i", s"val_$i", 1, 1, 1, 1))
+
     // Self-join
     df.registerTempTable("t")
     withTempTable("t") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to