This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c2aa48b8a1 [SPARK-55819][SQL] Refactor ExpandExec to be more succinct
b8c2aa48b8a1 is described below

commit b8c2aa48b8a13af82d01063f487feda9302644d1
Author: Yuchen Liu <[email protected]>
AuthorDate: Wed Mar 4 20:55:27 2026 +0800

    [SPARK-55819][SQL] Refactor ExpandExec to be more succinct
    
    ### What changes were proposed in this pull request?
    
    The implementation of `ExpandExec` is unnecessarily convoluted. It makes it 
hard to understand for such a simple logic. This PR tries to simplify it by 
using all native Iterator transformations.
    
    ### Why are the changes needed?
    
    Makes the code better, and easier to understand.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT. (Also manually verified, and asked Claude to verify.)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54601 from eason-yuchen-liu/refactorExpandExec.
    
    Authored-by: Yuchen Liu 
<[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/execution/ExpandExec.scala    | 25 +++-------------------
 1 file changed, 3 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
index 1b9432047d9d..254772f73208 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -59,29 +59,10 @@ case class ExpandExec(
     child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
       val groups = projections.map(projection).toArray
       groups.foreach(_.initialize(index))
-      new Iterator[InternalRow] {
-        private[this] var result: InternalRow = _
-        private[this] var idx = -1  // -1 means the initial state
-        private[this] var input: InternalRow = _
-
-        override final def hasNext: Boolean = (-1 < idx && idx < 
groups.length) || iter.hasNext
-
-        override final def next(): InternalRow = {
-          if (idx <= 0) {
-            // in the initial (-1) or beginning(0) of a new input row, fetch 
the next input tuple
-            input = iter.next()
-            idx = 0
-          }
-
-          result = groups(idx)(input)
-          idx += 1
-
-          if (idx == groups.length && iter.hasNext) {
-            idx = 0
-          }
-
+      iter.flatMap { input =>
+        groups.iterator.map { group =>
           numOutputRows += 1
-          result
+          group(input)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to