Repository: spark
Updated Branches:
  refs/heads/branch-1.1 0d9723309 -> 8b5af6f74


[SPARK-3036][SPARK-3037][SQL] Add MapType/ArrayType containing null value 
support to Parquet.

JIRA:
- https://issues.apache.org/jira/browse/SPARK-3036
- https://issues.apache.org/jira/browse/SPARK-3037

Currently this uses the following Parquet schema for `MapType` when 
`valueContainsNull` is `true`:

```
message root {
  optional group a (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required int32 key;
      optional int32 value;
    }
  }
}
```

for `ArrayType` when `containsNull` is `true`:

```
message root {
  optional group a (LIST) {
    repeated group bag {
      optional int32 array;
    }
  }
}
```

We have to think about compatibilities with older version of Spark or Hive or 
others I mentioned in the JIRA issues.

Notice:
This PR is based on #1963 and #1889.
Please check them first.

/cc marmbrus, yhuai

Author: Takuya UESHIN <[email protected]>

Closes #2032 from ueshin/issues/SPARK-3036_3037 and squashes the following 
commits:

4e8e9e7 [Takuya UESHIN] Add ArrayType containing null value support to Parquet.
013c2ca [Takuya UESHIN] Add MapType containing null value support to Parquet.
62989de [Takuya UESHIN] Merge branch 'issues/SPARK-2969' into 
issues/SPARK-3036_3037
8e38b53 [Takuya UESHIN] Merge branch 'issues/SPARK-3063' into 
issues/SPARK-3036_3037

(cherry picked from commit 727cb25bcc29481d6b744abef1ca091e64b5f91f)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b5af6f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b5af6f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b5af6f7

Branch: refs/heads/branch-1.1
Commit: 8b5af6f7494fbe83f5b48e72bcbcb636b96cfe75
Parents: 0d97233
Author: Takuya UESHIN <[email protected]>
Authored: Tue Aug 26 18:28:41 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Tue Aug 26 18:28:52 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetConverter.scala    | 83 ++++++++++++++++++++
 .../spark/sql/parquet/ParquetTableSupport.scala | 54 ++++++++-----
 .../apache/spark/sql/parquet/ParquetTypes.scala | 54 +++++++++----
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 16 +++-
 4 files changed, 167 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b5af6f7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index ef4526e..9fd6aed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -58,6 +58,7 @@ private[sql] object CatalystConverter {
   // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
   // Note that "array" for the array elements is chosen by ParquetAvro.
   // Using a different value will result in Parquet silently dropping columns.
+  val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
   val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
   val MAP_KEY_SCHEMA_NAME = "key"
   val MAP_VALUE_SCHEMA_NAME = "value"
@@ -82,6 +83,9 @@ private[sql] object CatalystConverter {
       case ArrayType(elementType: DataType, false) => {
         new CatalystArrayConverter(elementType, fieldIndex, parent)
       }
+      case ArrayType(elementType: DataType, true) => {
+        new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent)
+      }
       case StructType(fields: Seq[StructField]) => {
         new CatalystStructConverter(fields.toArray, fieldIndex, parent)
       }
@@ -568,6 +572,85 @@ private[parquet] class CatalystNativeArrayConverter(
 }
 
 /**
+ * A `parquet.io.api.GroupConverter` that converts a single-element groups that
+ * match the characteristics of an array contains null (see
+ * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an
+ * [[org.apache.spark.sql.catalyst.types.ArrayType]].
+ *
+ * @param elementType The type of the array elements (complex or primitive)
+ * @param index The position of this (array) field inside its parent converter
+ * @param parent The parent converter
+ * @param buffer A data buffer
+ */
+private[parquet] class CatalystArrayContainsNullConverter(
+    val elementType: DataType,
+    val index: Int,
+    protected[parquet] val parent: CatalystConverter,
+    protected[parquet] var buffer: Buffer[Any])
+  extends CatalystConverter {
+
+  def this(elementType: DataType, index: Int, parent: CatalystConverter) =
+    this(
+      elementType,
+      index,
+      parent,
+      new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE))
+
+  protected[parquet] val converter: Converter = new CatalystConverter {
+
+    private var current: Any = null
+
+    val converter = CatalystConverter.createConverter(
+      new CatalystConverter.FieldType(
+        CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+        elementType,
+        false),
+      fieldIndex = 0,
+      parent = this)
+
+    override def getConverter(fieldIndex: Int): Converter = converter
+
+    override def end(): Unit = parent.updateField(index, current)
+
+    override def start(): Unit = {
+      current = null
+    }
+
+    override protected[parquet] val size: Int = 1
+    override protected[parquet] val index: Int = 0
+    override protected[parquet] val parent = 
CatalystArrayContainsNullConverter.this
+
+    override protected[parquet] def updateField(fieldIndex: Int, value: Any): 
Unit = {
+      current = value
+    }
+
+    override protected[parquet] def clearBuffer(): Unit = {}
+  }
+
+  override def getConverter(fieldIndex: Int): Converter = converter
+
+  // arrays have only one (repeated) field, which is its elements
+  override val size = 1
+
+  override protected[parquet] def updateField(fieldIndex: Int, value: Any): 
Unit = {
+    buffer += value
+  }
+
+  override protected[parquet] def clearBuffer(): Unit = {
+    buffer.clear()
+  }
+
+  override def start(): Unit = {}
+
+  override def end(): Unit = {
+    assert(parent != null)
+    // here we need to make sure to use ArrayScalaType
+    parent.updateField(index, buffer.toArray.toSeq)
+    clearBuffer()
+  }
+}
+
+/**
  * This converter is for multi-element groups of primitive or complex types
  * that have repetition level optional or required (so struct fields).
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/8b5af6f7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 6a657c2..bdf0240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -173,7 +173,7 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
   private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
     if (value != null) {
       schema match {
-        case t @ ArrayType(_, false) => writeArray(
+        case t @ ArrayType(_, _) => writeArray(
           t,
           value.asInstanceOf[CatalystConverter.ArrayScalaType[_]])
         case t @ MapType(_, _, _) => writeMap(
@@ -228,45 +228,57 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
     }
   }
 
-  // TODO: support null values, see
-  // https://issues.apache.org/jira/browse/SPARK-1649
   private[parquet] def writeArray(
       schema: ArrayType,
       array: CatalystConverter.ArrayScalaType[_]): Unit = {
     val elementType = schema.elementType
     writer.startGroup()
     if (array.size > 0) {
-      writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-      var i = 0
-      while(i < array.size) {
-        writeValue(elementType, array(i))
-        i = i + 1
+      if (schema.containsNull) {
+        
writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
+        var i = 0
+        while (i < array.size) {
+          writer.startGroup()
+          if (array(i) != null) {
+            writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+            writeValue(elementType, array(i))
+            writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+          }
+          writer.endGroup()
+          i = i + 1
+        }
+        writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 
0)
+      } else {
+        writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
+        var i = 0
+        while (i < array.size) {
+          writeValue(elementType, array(i))
+          i = i + 1
+        }
+        writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
       }
-      writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
     }
     writer.endGroup()
   }
 
-  // TODO: support null values, see
-  // https://issues.apache.org/jira/browse/SPARK-1649
   private[parquet] def writeMap(
       schema: MapType,
       map: CatalystConverter.MapScalaType[_, _]): Unit = {
     writer.startGroup()
     if (map.size > 0) {
       writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-      writer.startGroup()
-      writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-      for(key <- map.keys) {
+      for ((key, value) <- map) {
+        writer.startGroup()
+        writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
         writeValue(schema.keyType, key)
+        writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
+        if (value != null) {
+          writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
+          writeValue(schema.valueType, value)
+          writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
+        }
+        writer.endGroup()
       }
-      writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-      writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-      for(value <- map.values) {
-        writeValue(schema.valueType, value)
-      }
-      writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-      writer.endGroup()
       writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
     }
     writer.endGroup()

http://git-wip-us.apache.org/repos/asf/spark/blob/8b5af6f7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index af8cd0a..1a52377 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -119,7 +119,13 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
         case ParquetOriginalType.LIST => { // TODO: check enums!
           assert(groupType.getFieldCount == 1)
           val field = groupType.getFields.apply(0)
-          ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
+          if (field.getName == 
CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
+            val bag = field.asGroupType()
+            assert(bag.getFieldCount == 1)
+            ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), 
containsNull = true)
+          } else {
+            ArrayType(toDataType(field, isBinaryAsString), containsNull = 
false)
+          }
         }
         case ParquetOriginalType.MAP => {
           assert(
@@ -129,28 +135,32 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
           assert(
             keyValueGroup.getFieldCount == 2,
             "Parquet Map type malformatted: nested group should have 2 (key, 
value) fields!")
-          val keyType = toDataType(keyValueGroup.getFields.apply(0), 
isBinaryAsString)
           assert(keyValueGroup.getFields.apply(0).getRepetition == 
Repetition.REQUIRED)
+
+          val keyType = toDataType(keyValueGroup.getFields.apply(0), 
isBinaryAsString)
           val valueType = toDataType(keyValueGroup.getFields.apply(1), 
isBinaryAsString)
-          assert(keyValueGroup.getFields.apply(1).getRepetition == 
Repetition.REQUIRED)
-          // TODO: set valueContainsNull explicitly instead of assuming 
valueContainsNull is true
-          // at here.
-          MapType(keyType, valueType)
+          MapType(keyType, valueType,
+            keyValueGroup.getFields.apply(1).getRepetition != 
Repetition.REQUIRED)
         }
         case _ => {
           // Note: the order of these checks is important!
           if (correspondsToMap(groupType)) { // MapType
             val keyValueGroup = groupType.getFields.apply(0).asGroupType()
-            val keyType = toDataType(keyValueGroup.getFields.apply(0), 
isBinaryAsString)
             assert(keyValueGroup.getFields.apply(0).getRepetition == 
Repetition.REQUIRED)
+
+            val keyType = toDataType(keyValueGroup.getFields.apply(0), 
isBinaryAsString)
             val valueType = toDataType(keyValueGroup.getFields.apply(1), 
isBinaryAsString)
-            assert(keyValueGroup.getFields.apply(1).getRepetition == 
Repetition.REQUIRED)
-            // TODO: set valueContainsNull explicitly instead of assuming 
valueContainsNull is true
-            // at here.
-            MapType(keyType, valueType)
+            MapType(keyType, valueType,
+              keyValueGroup.getFields.apply(1).getRepetition != 
Repetition.REQUIRED)
           } else if (correspondsToArray(groupType)) { // ArrayType
-            val elementType = toDataType(groupType.getFields.apply(0), 
isBinaryAsString)
-            ArrayType(elementType, containsNull = false)
+            val field = groupType.getFields.apply(0)
+            if (field.getName == 
CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
+              val bag = field.asGroupType()
+              assert(bag.getFieldCount == 1)
+              ArrayType(toDataType(bag.getFields.apply(0), isBinaryAsString), 
containsNull = true)
+            } else {
+              ArrayType(toDataType(field, isBinaryAsString), containsNull = 
false)
+            }
           } else { // everything else: StructType
             val fields = groupType
               .getFields
@@ -249,13 +259,27 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
             inArray = true)
           ConversionPatterns.listType(repetition, name, parquetElementType)
         }
+        case ArrayType(elementType, true) => {
+          val parquetElementType = fromDataType(
+            elementType,
+            CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+            nullable = true,
+            inArray = false)
+          ConversionPatterns.listType(
+            repetition,
+            name,
+            new ParquetGroupType(
+              Repetition.REPEATED,
+              CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME,
+              parquetElementType))
+        }
         case StructType(structFields) => {
           val fields = structFields.map {
             field => fromDataType(field.dataType, field.name, field.nullable, 
inArray = false)
           }
           new ParquetGroupType(repetition, name, fields)
         }
-        case MapType(keyType, valueType, _) => {
+        case MapType(keyType, valueType, valueContainsNull) => {
           val parquetKeyType =
             fromDataType(
               keyType,
@@ -266,7 +290,7 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
             fromDataType(
               valueType,
               CatalystConverter.MAP_VALUE_SCHEMA_NAME,
-              nullable = false,
+              nullable = valueContainsNull,
               inArray = false)
           ConversionPatterns.mapType(
             repetition,

http://git-wip-us.apache.org/repos/asf/spark/blob/8b5af6f7/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 28f43b3..4219cc0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -78,7 +78,9 @@ case class AllDataTypesWithNonPrimitiveType(
     booleanField: Boolean,
     binaryField: Array[Byte],
     array: Seq[Int],
-    map: Map[Int, String],
+    arrayContainsNull: Seq[Option[Int]],
+    map: Map[Int, Long],
+    mapValueContainsNull: Map[Int, Option[Long]],
     data: Data)
 
 class ParquetQuerySuite extends QueryTest with FunSuiteLike with 
BeforeAndAfterAll {
@@ -287,7 +289,11 @@ class ParquetQuerySuite extends QueryTest with 
FunSuiteLike with BeforeAndAfterA
       .map(x => AllDataTypesWithNonPrimitiveType(
         s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 
== 0,
         (0 to x).map(_.toByte).toArray,
-        (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), 
Nested(x, s"$x"))))
+        (0 until x),
+        (0 until x).map(Option(_).filter(_ % 3 == 0)),
+        (0 until x).map(i => i -> i.toLong).toMap,
+        (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
+        Data((0 until x), Nested(x, s"$x"))))
       .saveAsParquetFile(tempDir)
     val result = parquetFile(tempDir).collect()
     range.foreach {
@@ -302,8 +308,10 @@ class ParquetQuerySuite extends QueryTest with 
FunSuiteLike with BeforeAndAfterA
         assert(result(i).getBoolean(7) === (i % 2 == 0))
         assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
         assert(result(i)(9) === (0 until i))
-        assert(result(i)(10) === (0 until i).map(i => i -> s"$i").toMap)
-        assert(result(i)(11) === new GenericRow(Array[Any]((0 until i), new 
GenericRow(Array[Any](i, s"$i")))))
+        assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else 
null))
+        assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap)
+        assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + 
(i -> null))
+        assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new 
GenericRow(Array[Any](i, s"$i")))))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to