This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 32fc10c35c4 [SPARK-41886][CONNECT][PYTHON] `DataFrame.intersect` 
doctest output has different order
32fc10c35c4 is described below

commit 32fc10c35c48f96cce1d1d713c148572755a01e6
Author: Jiaan Geng <[email protected]>
AuthorDate: Tue Jan 10 22:47:08 2023 +0800

    [SPARK-41886][CONNECT][PYTHON] `DataFrame.intersect` doctest output has 
different order
    
    ### What changes were proposed in this pull request?
    File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py", line 
609, in pyspark.sql.connect.dataframe.DataFrame.intersect
    Failed example:
        `df1.intersect(df2).show()`
    Expected:
    ```
        +---+---+
        | C1| C2|
        +---+---+
        |  b|  3|
        |  a|  1|
        +---+---+
    ```
    Got:
    ```
        +---+---+
        | C1| C2|
        +---+---+
        |  a|  1|
        |  b|  3|
        +---+---+
        <BLANKLINE>
    ```
    
    After my investigation, the root cause is the different plan from pyspark 
to connect.
    The plan come from pyspark show below.
    ```
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- HashAggregate(keys=[C1#2603, C2#2604L], functions=[])
       +- Exchange hashpartitioning(C1#2603, C2#2604L, 200), 
ENSURE_REQUIREMENTS, [plan_id=11257]
          +- HashAggregate(keys=[C1#2603, C2#2604L], functions=[])
             +- SortMergeJoin [coalesce(C1#2603, ), isnull(C1#2603), 
coalesce(C2#2604L, 0), isnull(C2#2604L)], [coalesce(C1#2607, ), 
isnull(C1#2607), coalesce(C2#2608L, 0), isnull(C2#2608L)], LeftSemi
                :- Sort [coalesce(C1#2603, ) ASC NULLS FIRST, isnull(C1#2603) 
ASC NULLS FIRST, coalesce(C2#2604L, 0) ASC NULLS FIRST, isnull(C2#2604L) ASC 
NULLS FIRST], false, 0
                :  +- Exchange hashpartitioning(coalesce(C1#2603, ), 
isnull(C1#2603), coalesce(C2#2604L, 0), isnull(C2#2604L), 200), 
ENSURE_REQUIREMENTS, [plan_id=11250]
                :     +- Scan ExistingRDD[C1#2603,C2#2604L]
                +- Sort [coalesce(C1#2607, ) ASC NULLS FIRST, isnull(C1#2607) 
ASC NULLS FIRST, coalesce(C2#2608L, 0) ASC NULLS FIRST, isnull(C2#2608L) ASC 
NULLS FIRST], false, 0
                   +- Exchange hashpartitioning(coalesce(C1#2607, ), 
isnull(C1#2607), coalesce(C2#2608L, 0), isnull(C2#2608L), 200), 
ENSURE_REQUIREMENTS, [plan_id=11251]
                      +- Scan ExistingRDD[C1#2607,C2#2608L]
    <BLANKLINE>
    <BLANKLINE>
    ```
    
    The plan come from connect show below.
    ```
    == Physical Plan ==
    AdaptiveSparkPlan isFinalPlan=false
    +- HashAggregate(keys=[C1#1988, C2#1989L], functions=[])
       +- Exchange hashpartitioning(C1#1988, C2#1989L, 200), 
ENSURE_REQUIREMENTS, [plan_id=1550]
          +- HashAggregate(keys=[C1#1988, C2#1989L], functions=[])
             +- BroadcastHashJoin [coalesce(C1#1988, ), isnull(C1#1988), 
coalesce(C2#1989L, 0), isnull(C2#1989L)], [coalesce(C1#1996, ), 
isnull(C1#1996), coalesce(C2#1997L, 0), isnull(C2#1997L)], LeftSemi, 
BuildRight, false
                :- LocalTableScan [C1#1988, C2#1989L]
                +- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ), 
isnull(input[0, string, true]), coalesce(input[1, bigint, true], 0), 
isnull(input[1, bigint, true])),false), [plan_id=1546]
                   +- LocalTableScan [C1#1996, C2#1997L]
    ```
    
    So the shuffle impacts the order of output.
    
    ### Why are the changes needed?
    This PR adds `sort` operator to the example to test `DataFrame.intersect` 
so that the output has determined order.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    Manual test and doc test.
    
    Closes #39483 from beliefer/SPARK-41886.
    
    Authored-by: Jiaan Geng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/connect/dataframe.py | 3 ---
 python/pyspark/sql/dataframe.py         | 2 +-
 2 files changed, 1 insertion(+), 4 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index b81244dc113..c9cd65b93b7 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1632,9 +1632,6 @@ def _test() -> None:
         del pyspark.sql.connect.dataframe.DataFrame.drop.__doc__
         del pyspark.sql.connect.dataframe.DataFrame.join.__doc__
 
-        # TODO(SPARK-41886): The doctest output has different order
-        del pyspark.sql.connect.dataframe.DataFrame.intersect.__doc__
-
         # TODO(SPARK-41625): Support Structured Streaming
         del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__
 
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a437400fb90..09a5e9d0b07 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -3657,7 +3657,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         --------
         >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 
4)], ["C1", "C2"])
         >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", 
"C2"])
-        >>> df1.intersect(df2).show()
+        >>> df1.intersect(df2).sort(df1.C1.desc()).show()
         +---+---+
         | C1| C2|
         +---+---+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to