This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 205b65e  [SPARK-33134][SQL][3.0] Return partial results only for root 
JSON objects
205b65e is described below

commit 205b65e35c45d3e4f5b3f798a3b48410e94ab2c5
Author: Max Gekk <[email protected]>
AuthorDate: Wed Oct 14 12:14:32 2020 +0900

    [SPARK-33134][SQL][3.0] Return partial results only for root JSON objects
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to restrict the partial result feature only by root 
JSON objects. JSON datasource as well as `from_json()` will return `null` for 
malformed nested JSON objects.
    
    ### Why are the changes needed?
    1. To not raise exception to users in the PERMISSIVE mode
    2. To fix a regression and to have the same behavior as Spark 2.4.x has
    3. Current implementation of partial result is supposed to work only for 
root (top-level) JSON objects, and not tested for bad nested complex JSON 
fields.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Before the changes, the code below:
    ```scala
        val pokerhand_raw = Seq("""[{"cards": [19], "playerId": 
123456}]""").toDF("events")
        val event = new StructType().add("playerId", LongType).add("cards", 
ArrayType(new StructType().add("id", LongType).add("rank", StringType)))
        val pokerhand_events = pokerhand_raw.select(from_json($"events", 
ArrayType(event)).as("event"))
        pokerhand_events.show
    ```
    throws the exception even in the default **PERMISSIVE** mode:
    ```java
    java.lang.ClassCastException: java.lang.Long cannot be cast to 
org.apache.spark.sql.catalyst.util.ArrayData
      at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
      at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
      at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
    ```
    
    After the changes:
    ```
    +-----+
    |event|
    +-----+
    | null|
    +-----+
    ```
    
    ### How was this patch tested?
    Added a test to `JsonFunctionsSuite`.
    
    Closes #30032 from MaxGekk/json-skip-row-wrong-schema-3.0.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../org/apache/spark/sql/catalyst/json/JacksonParser.scala |  7 ++++---
 .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala    | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index e038f77..70f7f8f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -86,7 +86,7 @@ class JacksonParser(
     val elementConverter = makeConverter(st)
     val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
     (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) {
-      case START_OBJECT => Some(convertObject(parser, st, fieldConverters))
+      case START_OBJECT => Some(convertObject(parser, st, fieldConverters, 
isRoot = true))
         // SPARK-3308: support reading top level JSON arrays and take every 
element
         // in such an array as a row
         //
@@ -375,7 +375,8 @@ class JacksonParser(
   private def convertObject(
       parser: JsonParser,
       schema: StructType,
-      fieldConverters: Array[ValueConverter]): InternalRow = {
+      fieldConverters: Array[ValueConverter],
+      isRoot: Boolean = false): InternalRow = {
     val row = new GenericInternalRow(schema.length)
     var badRecordException: Option[Throwable] = None
 
@@ -386,7 +387,7 @@ class JacksonParser(
             row.update(index, fieldConverters(index).apply(parser))
           } catch {
             case e: SparkUpgradeException => throw e
-            case NonFatal(e) =>
+            case NonFatal(e) if isRoot =>
               badRecordException = badRecordException.orElse(Some(e))
               parser.skipChildren()
           }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index a4ed7cc..5e3931c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -712,4 +712,18 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSparkSession {
          | """.stripMargin)
     checkAnswer(toDF("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
toDF("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]"))
   }
+
+  test("SPARK-33134: return partial results only for root JSON objects") {
+    val st = new StructType()
+      .add("c1", LongType)
+      .add("c2", ArrayType(new StructType().add("c3", LongType).add("c4", 
StringType)))
+    val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0")
+    checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null)))
+    val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
+    checkAnswer(df2.select(from_json($"c0", new StructType().add("data", 
st))), Row(Row(null)))
+    val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
+    checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
+    val df4 = Seq("""{"c2": [19]}""").toDF("c0")
+    checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), 
Row(null))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to