Repository: spark Updated Branches: refs/heads/master 0463428b6 -> 03668348e
[SPARK-7637] [SQL] O(N) merge implementation for StructType merge Contribution is my original work and I license the work to the project under the projects open source license. Author: rowan <[email protected]> Closes #6259 from rowan000/SPARK-7637 and squashes the following commits: c479df4 [rowan] SPARK-7637: rename mapFields to fieldsMap as per comments on github. 8d2e419 [rowan] SPARK-7637: fix up whitespace changes 0e9d662 [rowan] SPARK-7637: O(N) merge implementatio for StructType merge Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03668348 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03668348 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03668348 Branch: refs/heads/master Commit: 03668348e29eb52c1a7d57a1e0ed7fca6c323890 Parents: 0463428 Author: rowan <[email protected]> Authored: Tue May 26 18:17:16 2015 -0700 Committer: Michael Armbrust <[email protected]> Committed: Tue May 26 18:17:16 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/types/StructType.scala | 12 +++- .../apache/spark/sql/types/DataTypeSuite.scala | 73 +++++++++++++++++++- 2 files changed, 81 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/03668348/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 7e00a27..a4f30c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -230,10 +230,10 @@ object StructType { case (StructType(leftFields), StructType(rightFields)) => val newFields = ArrayBuffer.empty[StructField] + val rightMapped = fieldsMap(rightFields) leftFields.foreach { case leftField @ StructField(leftName, leftType, leftNullable, _) => - rightFields - .find(_.name == leftName) + rightMapped.get(leftName) .map { case rightField @ StructField(_, rightType, rightNullable, _) => leftField.copy( dataType = merge(leftType, rightType), @@ -243,8 +243,9 @@ object StructType { .foreach(newFields += _) } + val leftMapped = fieldsMap(leftFields) rightFields - .filterNot(f => leftFields.map(_.name).contains(f.name)) + .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach(newFields += _) StructType(newFields) @@ -264,4 +265,9 @@ object StructType { case _ => throw new SparkException(s"Failed to merge incompatible data types $left and $right") } + + private[sql] def fieldsMap(fields: Array[StructField]): Map[String, StructField] = { + import scala.collection.breakOut + fields.map(s => (s.name, s))(breakOut) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/03668348/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index d797510..a73317c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.types +import org.apache.spark.SparkException import org.scalatest.FunSuite class DataTypeSuite extends FunSuite { @@ -69,6 +70,76 @@ class DataTypeSuite extends FunSuite { } } + test("fieldsMap returns map of name to StructField") { + val struct = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val mapped = StructType.fieldsMap(struct.fields) + + val expected = Map( + "a" -> StructField("a", LongType), + "b" -> StructField("b", FloatType)) + + assert(mapped === expected) + } + + test("merge where right is empty") { + val left = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val right = StructType(List()) + val merged = left.merge(right) + + assert(merged === left) + } + + test("merge where left is empty") { + + val left = StructType(List()) + + val right = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val merged = left.merge(right) + + assert(right === merged) + + } + + test("merge where both are non-empty") { + val left = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val right = StructType( + StructField("c", LongType) :: Nil) + + val expected = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: + StructField("c", LongType) :: Nil) + + val merged = left.merge(right) + + assert(merged === expected) + } + + test("merge where right contains type conflict") { + val left = StructType( + StructField("a", LongType) :: + StructField("b", FloatType) :: Nil) + + val right = StructType( + StructField("b", LongType) :: Nil) + + intercept[SparkException] { + left.merge(right) + } + } + def checkDataTypeJsonRepr(dataType: DataType): Unit = { test(s"JSON - $dataType") { assert(DataType.fromJson(dataType.json) === dataType) @@ -120,7 +191,7 @@ class DataTypeSuite extends FunSuite { checkDefaultSize(DecimalType(10, 5), 4096) checkDefaultSize(DecimalType.Unlimited, 4096) checkDefaultSize(DateType, 4) - checkDefaultSize(TimestampType,12) + checkDefaultSize(TimestampType, 12) checkDefaultSize(StringType, 4096) checkDefaultSize(BinaryType, 4096) checkDefaultSize(ArrayType(DoubleType, true), 800) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
