This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 291f4a3e3db9 [SPARK-51935][SQL] Fix lazy behavior of iterators in 
interpreted df.collect()
291f4a3e3db9 is described below

commit 291f4a3e3db92dac6a56df5c82b68a922cc49951
Author: Harsh Motwani <harsh.motw...@databricks.com>
AuthorDate: Tue Apr 29 08:44:06 2025 +0800

    [SPARK-51935][SQL] Fix lazy behavior of iterators in interpreted 
df.collect()
    
    ### What changes were proposed in this pull request?
    
    Prior to this PR, in a very specific codepath in the non-SQL expression 
`MapObjects`, an iterator is returned by df.collect() when codegen is disabled. 
This can lead to non-deterministic behavior when codegen is disabled. For 
example the following code could return incorrect results:
    ```
    spark.conf.set("spark.sql.codegen.factoryMode", "NO_CODEGEN")
    
    import org.apache.spark.sql.Row
    
    case class Posting(transfer_type: String)
    
    val df3 = spark.range(0, 8, 1, 1).selectExpr("""
    case when id = 0 then array(named_struct('a', '1'), named_struct('a', '3'))
    when id = 1 then null
    when id = 2 then null
    when id = 3 then null
    when id = 4 then null
    when id = 5 then null
    when id = 6 then array(named_struct('a', '5'), named_struct('a', '7'))
    else array(named_struct('a', '9'), named_struct('a', '11'))
    end result
    """)
    
    val res6 = df3.select("result").map(row =>
      if (row.get(0) != null) row.get(0).asInstanceOf[Seq[Row]].map(v => 
Posting(v.get(0).asInstanceOf[String]))
      else null
    ).collect()
    ```
    
    ### Why are the changes needed?
    
    The previous code that returned an iterator could give incorrect results 
when a DataFrame represents scala case classes. This PR fixes it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Users could see more deterministic behavior when forcing Interpreted mode 
and using scala objects in DataFrames.
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50741 from harshmotw-db/harsh-motwani_data/SPARK-51935.
    
    Authored-by: Harsh Motwani <harsh.motw...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/expressions/objects/objects.scala     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 90ab8725e553..28e934b5df95 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -992,7 +992,7 @@ case class MapObjects private(
       }
     case Some(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
       // Scala sequence
-      executeFuncOnCollection(_).toSeq
+      executeFuncOnCollection(_).toIndexedSeq
     case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
       // Scala set
       executeFuncOnCollection(_).toSet


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to