This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 5d32fa98043c [SPARK-48965][SQL] Use the correct schema in 
`Dataset#toJSON`
5d32fa98043c is described below

commit 5d32fa98043ccd0d154d8631ba30c32005e05125
Author: Bruce Robbins <[email protected]>
AuthorDate: Wed Sep 4 21:43:28 2024 -0700

    [SPARK-48965][SQL] Use the correct schema in `Dataset#toJSON`
    
    In `Dataset#toJSON`, use the schema from `exprEnc`. This schema reflects 
any changes (e.g., decimal precision, column ordering) that `exprEnc` might 
make to input rows.
    
    `Dataset#toJSON` currently uses the schema from the logical plan, but that 
schema does not necessarily describe the rows passed to `JacksonGenerator`: the 
function passed to `mapPartitions` uses `exprEnc` to serialize the input, and 
this could potentially change the precision on decimals or rearrange columns.
    
    Here's an example that tricks `UnsafeRow#getDecimal` (called from 
`JacksonGenerator`) to mistakenly assume the decimal is stored as a Long:
    ```
    scala> case class Data(a: BigDecimal)
    class Data
    
    scala> sql("select 123.456bd as a").as[Data].toJSON.collect
    warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`
    val res0: Array[String] = Array({"a":68719476.745})
    
    scala>
    ```
    Here's an example that tricks `JacksonGenerator` to ask for a string from 
an array and an array from a string. This case actually crashes the JVM:
    ```
    scala> case class Data(x: Array[Int], y: String)
    class Data
    
    scala> sql("select repeat('Hey there', 17) as y, array_repeat(22, 17) as 
x").as[Data].toJSON.collect
    warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`
    Exception in task 0.0 in stage 0.0 (TID 0)
    java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
            at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$5(JacksonGenerator.scala:129)
 ~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
            at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$5$adapted(JacksonGenerator.scala:128)
 ~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
            at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.writeArrayData(JacksonGenerator.scala:258)
 ~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
            at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23(JacksonGenerator.scala:201)
 ~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
            at 
org.apache.spark.sql.catalyst.json.JacksonGenerator.writeArray(JacksonGenerator.scala:249)
 ~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
    ...
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:833)
    
    bash-3.2$
    ```
    Both these cases work correctly without `toJSON`.
    
    Before the PR, converting the dataframe to a dataset of Tuple would 
preserve the column names in the JSON strings:
    ```
    scala> sql("select 123.456d as a, 12 as b").as[(Double, Int)].toJSON.collect
    warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`
    val res0: Array[String] = Array({"a":123.456,"b":12})
    
    scala>
    ```
    After the PR, the JSON strings use the field name from the Tuple class:
    ```
    scala> sql("select 123.456d as a, 12 as b").as[(Double, Int)].toJSON.collect
    warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`
    val res1: Array[String] = Array({"_1":123.456,"_2":12})
    
    scala>
    ```
    
    New tests.
    
    No.
    
    Closes #47982 from bersprockets/to_json_issue.
    
    Authored-by: Bruce Robbins <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 5375ce2acfe206eb64fb8bede44fe47c643fcd46)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  2 +-
 .../sql/execution/datasources/json/JsonSuite.scala | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3f43bccda7ab..f611208e1cae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3926,7 +3926,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def toJSON: Dataset[String] = {
-    val rowSchema = this.schema
+    val rowSchema = exprEnc.schema
     val sessionLocalTimeZone = 
sparkSession.sessionState.conf.sessionLocalTimeZone
     mapPartitions { iter =>
       val writer = new CharArrayWriter()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index bbcda1df0339..b1b9a065986a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, Da
 import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.SQLTestData.{DecimalData, TestData}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.StructType.fromDDL
 import org.apache.spark.sql.types.TestUDT.{MyDenseVector, MyDenseVectorUDT}
@@ -3654,6 +3655,34 @@ abstract class JsonSuite
     assert(JSONOptions.getAlternativeOption("charset").contains("encoding"))
     assert(JSONOptions.getAlternativeOption("dateFormat").isEmpty)
   }
+
+  test("SPARK-48965: Dataset#toJSON should use correct schema #1: decimals") {
+    val numString = "123.456"
+    val bd = BigDecimal(numString)
+    val ds1 = sql(s"select ${numString}bd as a, ${numString}bd as 
b").as[DecimalData]
+    checkDataset(
+      ds1,
+      DecimalData(bd, bd)
+    )
+    val ds2 = ds1.toJSON
+    checkDataset(
+      ds2,
+      "{\"a\":123.456000000000000000,\"b\":123.456000000000000000}"
+    )
+  }
+
+  test("SPARK-48965: Dataset#toJSON should use correct schema #2: misaligned 
columns") {
+    val ds1 = sql("select 'Hey there' as value, 90000001 as key").as[TestData]
+    checkDataset(
+      ds1,
+      TestData(90000001, "Hey there")
+    )
+    val ds2 = ds1.toJSON
+    checkDataset(
+      ds2,
+      "{\"key\":90000001,\"value\":\"Hey there\"}"
+    )
+  }
 }
 
 class JsonV1Suite extends JsonSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to