Repository: spark Updated Branches: refs/heads/master ca9ef86c8 -> 917f4000b
[SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild ## What changes were proposed in this pull request? This https://github.com/apache/spark/pull/2400 added the support to parse JSON rows wrapped with an array. However, this throws an exception when the given data contains array data and struct data in the same field as below: ```json {"a": {"b": 1}} {"a": []} ``` and the schema is given as below: ```scala val schema = StructType( StructField("a", StructType( StructField("b", StringType) :: Nil )) :: Nil) ``` - **Before** ```scala sqlContext.read.schema(schema).json(path).show() ``` ```scala Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 10, 192.168.1.170): java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) ... ``` - **After** ```scala sqlContext.read.schema(schema).json(path).show() ``` ```bash +----+ | a| +----+ | [1]| |null| +----+ ``` For other data types, in this case it converts the given values are `null` but only this case emits an exception. This PR makes the support for wrapped rows applied only at the top level. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for code style tests. Author: hyukjinkwon <[email protected]> Closes #11752 from HyukjinKwon/SPARK-3308-follow-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/917f4000 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/917f4000 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/917f4000 Branch: refs/heads/master Commit: 917f4000b418ee0997551d4dfabb369c4e9243a9 Parents: ca9ef86 Author: hyukjinkwon <[email protected]> Authored: Wed Mar 16 18:20:30 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Wed Mar 16 18:20:30 2016 -0700 ---------------------------------------------------------------------- .../datasources/json/JSONRelation.scala | 1 - .../datasources/json/JacksonParser.scala | 37 +++++++++++++------- .../execution/datasources/json/JsonSuite.scala | 19 +++++++++- .../datasources/json/TestJsonData.scala | 5 +++ 4 files changed, 48 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 3fa5ebf..751d78d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json import java.io.CharArrayWriter import com.fasterxml.jackson.core.JsonFactory -import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index b2f5c1e..3252b6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -49,8 +49,31 @@ object JacksonParser { /** * Parse the current token (and related children) according to a desired schema + * This is an wrapper for the method `convertField()` to handle a row wrapped + * with an array. */ - def convertField( + def convertRootField( + factory: JsonFactory, + parser: JsonParser, + schema: DataType): Any = { + import com.fasterxml.jackson.core.JsonToken._ + (parser.getCurrentToken, schema) match { + case (START_ARRAY, st: StructType) => + // SPARK-3308: support reading top level JSON arrays and take every element + // in such an array as a row + convertArray(factory, parser, st) + + case (START_OBJECT, ArrayType(st, _)) => + // the business end of SPARK-3308: + // when an object is found but an array is requested just wrap it in a list + convertField(factory, parser, st) :: Nil + + case _ => + convertField(factory, parser, schema) + } + } + + private def convertField( factory: JsonFactory, parser: JsonParser, schema: DataType): Any = { @@ -157,19 +180,9 @@ object JacksonParser { case (START_OBJECT, st: StructType) => convertObject(factory, parser, st) - case (START_ARRAY, st: StructType) => - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row - convertArray(factory, parser, st) - case (START_ARRAY, ArrayType(st, _)) => convertArray(factory, parser, st) - case (START_OBJECT, ArrayType(st, _)) => - // the business end of SPARK-3308: - // when an object is found but an array is requested just wrap it in a list - convertField(factory, parser, st) :: Nil - case (START_OBJECT, MapType(StringType, kt, _)) => convertMap(factory, parser, kt) @@ -264,7 +277,7 @@ object JacksonParser { Utils.tryWithResource(factory.createParser(record)) { parser => parser.nextToken() - convertField(factory, parser, schema) match { + convertRootField(factory, parser, schema) match { case null => failedRecord(record) case row: InternalRow => row :: Nil case array: ArrayData => http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4a8c128..6d942c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Utils.tryWithResource(factory.createParser(writer.toString)) { parser => parser.nextToken() - JacksonParser.convertField(factory, parser, dataType) + JacksonParser.convertRootField(factory, parser, dataType) } } @@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Parse JSON rows having an array type and a struct type in the same field.") { + withTempDir { dir => + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val schema = + StructType( + StructField("a", StructType( + StructField("b", StringType) :: Nil + )) :: Nil) + val jsonDF = sqlContext.read.schema(schema).json(path) + assert(jsonDF.count() == 2) + } + } + test("SPARK-12872 Support to specify the option for compression codec") { withTempDir { dir => val dir = Utils.createTempDir() http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index a083605..b2eff81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -209,6 +209,11 @@ private[json] trait TestJsonData { sqlContext.sparkContext.parallelize( """{"ts":1451732645}""" :: Nil) + def arrayAndStructRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + """{"a": {"b": 1}}""" :: + """{"a": []}""" :: Nil) + lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
