This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 177aee0707c2 [SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging 
column-projected DataFrames
177aee0707c2 is described below

commit 177aee0707c210beefe067e6b0348a388c69cae7
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Sun May 31 20:43:52 2026 +0800

    [SPARK-55886][SQL][PYTHON] Add DataFrame.zip for merging column-projected 
DataFrames
    
    ### What changes were proposed in this pull request?
    
    Add a new `DataFrame.zip(other)` API that combines the columns of two 
DataFrames side-by-side without a join key by reusing the shared base plan 
rather than emitting a relational join.
    
    - **Logical plan**: introduces `Zip(left, right)`, always unresolved, with 
a `ZIP` tree pattern.
    - **Analyzer**: a new `ResolveZip` rule walks both `Project` chains down to 
their bases and -- when the two bases produce the same canonicalized result -- 
rewrites the `Zip` into a chain of `Project` nodes over that shared base. Each 
user-written alias is kept in its own `Alias`, grouped into one `Project` layer 
per dependency depth, so `CollapseProject`'s existing safety guards 
(`canCollapseExpressions`) still apply and nondeterministic/expensive producers 
are never duplicated. Alias [...]
    - **CheckAnalysis**: any `Zip` that survives `ResolveZip` (the two sides do 
not share a base, or a side contains a non-scalar Python UDF) raises a new 
`ZIP_PLANS_NOT_MERGEABLE` error (sqlState `42K03`).
    - **Scala API**: `Dataset.zip` declared in `sql/api` and implemented in the 
classic `Dataset`; Spark Connect throws `UnsupportedOperationException` 
(planned for a follow-up).
    - **PySpark**: abstract `DataFrame.zip` on the parent class, classic 
implementation delegating to the JVM via `_jdf.zip`, Connect raises 
`PySparkNotImplementedError`. New entry in the API reference index.
    
    ### Why are the changes needed?
    
    `RDD.zip` is a natural way to project two views of the same data and 
recombine them row-for-row. There has been no DataFrame equivalent: users 
porting that pattern have to fall back to a join on a synthetic row id, or 
recompute the source and select both column sets, which adds unnecessary work 
to the plan (a shuffle/join, or duplicated source evaluation) when the two 
sides are known to be row-aligned by construction.
    
    `DataFrame.zip` lifts the RDD pattern to the DataFrame API. Because the 
analyzer rewrites the operator into a `Project` chain over the shared base 
(collapsed by the optimizer), the operation is free at runtime: no join, no 
extra scan, no shuffle.
    
    Side-by-side:
    
    ```python
    # RDD: rdd.zip lines up two row-aligned projections of the same source.
    square  = lambda x: x * x
    is_even = lambda x: x % 2 == 0
    
    rdd   = sc.range(10)
    rdd1  = rdd.map(square)
    rdd2  = rdd.map(is_even)
    zipped_rdd = rdd1.zip(rdd2).collect()
    # [(0, True), (1, False), (4, True), (9, False), (16, True),
    #  (25, False), (36, True), (49, False), (64, True), (81, False)]
    
    # DataFrame: the same pattern, now expressible directly.
    square_udf  = sf.udf(square,  LongType())
    is_even_udf = sf.udf(is_even, BooleanType())
    
    df  = spark.range(10)
    df1 = df.select(square_udf("id").alias("square"))
    df2 = df.select(is_even_udf("id").alias("is_even"))
    df1.zip(df2).show()
    # +------+-------+
    # |square|is_even|
    # +------+-------+
    # |     0|   true|
    # |     1|  false|
    # |     4|   true|
    # |     9|  false|
    # |    16|   true|
    # |    25|  false|
    # |    36|   true|
    # |    49|  false|
    # |    64|   true|
    # |    81|  false|
    # +------+-------+
    ```
    
    Additional patterns this enables:
    - Computing a derived column on one branch and aligning it with a derived 
column from the same source.
    - Splitting a single transformation into independently named sub-DataFrames 
and recombining them.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. New public API:
    
    - Scala: `Dataset.zip(other: Dataset[_]): DataFrame`, `since 4.3.0`.
    - PySpark: `DataFrame.zip(other)`, `versionadded:: 4.3.0`.
    
    ### How was this patch tested?
    
    - New `ResolveZipSuite` (catalyst) covering the analyzer rewrite: matching 
bases, mismatched bases, expression projections, partial `Project` on one side, 
unresolved children, longer chains of `Project`s, alias composition through 
chains, stacked `withColumn`-style projections, different-instance bases with 
the same canonicalized plan (exercises the positional attribute remap), and 
both `ZIP_PLANS_NOT_MERGEABLE` triggers (mismatched bases and non-scalar Python 
UDF).
    - New `DataFrameZipSuite` (sql/core) covering end-to-end results, the 
resolved-plan shape, `withColumn`/`withColumnRenamed` on both sides, longer 
chains, parent-with-chained-child, the two error cases (unrelated DataFrames, 
`spark.range` sources), and two guards that a shared producer is evaluated only 
once -- a single-side `rand()` referenced twice, and a `rand()` in a parent 
shared by both sides (asserting exactly one `Rand` in the optimized plan).
    - New `python/pyspark/sql/tests/test_zip.py` mirroring the Scala suite plus 
scalar Python UDF and pandas UDF cases, and `test_parity_zip.py` asserting 
Spark Connect raises `NOT_IMPLEMENTED`.
    - MIMA exclusion added for the new abstract method on `Dataset`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    Closes #54976 from zhengruifeng/df-zip.
    
    Lead-authored-by: Ruifeng Zheng <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 dev/sparktestsupport/modules.py                    |   2 +
 project/MimaExcludes.scala                         |   2 +
 .../source/reference/pyspark.sql/dataframe.rst     |   1 +
 python/pyspark/sql/classic/dataframe.py            |   3 +
 python/pyspark/sql/connect/dataframe.py            |  10 +
 python/pyspark/sql/dataframe.py                    |  43 ++++
 .../pyspark/sql/tests/connect/test_parity_zip.py   |  37 ++++
 python/pyspark/sql/tests/test_zip.py               | 185 +++++++++++++++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  22 ++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   1 +
 .../sql/catalyst/analysis/CheckAnalysis.scala      |   9 +
 .../spark/sql/catalyst/analysis/ResolveZip.scala   | 199 ++++++++++++++++++
 .../plans/logical/basicLogicalOperators.scala      |  24 +++
 .../sql/catalyst/rules/RuleIdCollection.scala      |   1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   1 +
 .../sql/catalyst/analysis/ResolveZipSuite.scala    | 211 +++++++++++++++++++
 .../org/apache/spark/sql/connect/Dataset.scala     |   5 +
 .../org/apache/spark/sql/classic/Dataset.scala     |   5 +
 .../org/apache/spark/sql/DataFrameZipSuite.scala   | 229 +++++++++++++++++++++
 20 files changed, 996 insertions(+)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 8d0a7d20eb14..91976d21934d 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -8970,6 +8970,12 @@
     ],
     "sqlState" : "42KDF"
   },
+  "ZIP_PLANS_NOT_MERGEABLE" : {
+    "message" : [
+      "The two DataFrames in zip() cannot be merged. They must produce the 
same canonicalized plan after stripping outer Project chains, and must not 
contain non-scalar Python UDFs."
+    ],
+    "sqlState" : "42K03"
+  },
   "_LEGACY_ERROR_TEMP_0001" : {
     "message" : [
       "Invalid InsertIntoContext."
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 286a0c35b27e..7a4e50ff4141 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -614,6 +614,7 @@ pyspark_sql = Module(
         "pyspark.sql.tests.test_serde",
         "pyspark.sql.tests.test_session",
         "pyspark.sql.tests.test_nearest_by_join",
+        "pyspark.sql.tests.test_zip",
         "pyspark.sql.tests.test_subquery",
         "pyspark.sql.tests.test_types",
         "pyspark.sql.tests.test_geographytype",
@@ -1178,6 +1179,7 @@ pyspark_connect = Module(
         "pyspark.sql.tests.connect.test_parity_repartition",
         "pyspark.sql.tests.connect.test_parity_stat",
         "pyspark.sql.tests.connect.test_parity_nearest_by_join",
+        "pyspark.sql.tests.connect.test_parity_zip",
         "pyspark.sql.tests.connect.test_parity_subquery",
         "pyspark.sql.tests.connect.test_parity_types",
         "pyspark.sql.tests.connect.test_parity_column",
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b004e62ae739..422db17e1036 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -50,6 +50,8 @@ object MimaExcludes {
     
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this"),
     // [SPARK-55228][SQL] Implement Dataset.zipWithIndex in Scala API
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zipWithIndex"),
+    // [SPARK-55886][SQL] Add DataFrame.zip
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zip"),
     // [SPARK-55949][SQL] Add DataFrame API for CDC queries
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.DataFrameReader.changes"),
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamReader.changes"),
diff --git a/python/docs/source/reference/pyspark.sql/dataframe.rst 
b/python/docs/source/reference/pyspark.sql/dataframe.rst
index e61100435664..22e88bb67d5e 100644
--- a/python/docs/source/reference/pyspark.sql/dataframe.rst
+++ b/python/docs/source/reference/pyspark.sql/dataframe.rst
@@ -141,6 +141,7 @@ DataFrame
     DataFrame.writeTo
     DataFrame.mergeInto
     DataFrame.pandas_api
+    DataFrame.zip
     DataFrame.zipWithIndex
     DataFrameNaFunctions.drop
     DataFrameNaFunctions.fill
diff --git a/python/pyspark/sql/classic/dataframe.py 
b/python/pyspark/sql/classic/dataframe.py
index e1c3380416e4..a4288270a5ad 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -776,6 +776,9 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
         jdf = self._jdf.crossJoin(other._jdf)
         return DataFrame(jdf, self.sparkSession)
 
+    def zip(self, other: ParentDataFrame) -> ParentDataFrame:
+        return DataFrame(self._jdf.zip(other._jdf), self.sparkSession)
+
     def join(
         self,
         other: ParentDataFrame,
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index b0a9692f289a..0c8e8d9fe9ba 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -379,6 +379,12 @@ class DataFrame(ParentDataFrame):
             session=self._session,
         )
 
+    def zip(self, other: ParentDataFrame) -> ParentDataFrame:
+        raise PySparkNotImplementedError(
+            errorClass="NOT_IMPLEMENTED",
+            messageParameters={"feature": "zip"},
+        )
+
     def _check_same_session(self, other: ParentDataFrame) -> "DataFrame":
         if (
             not isinstance(other, DataFrame)
@@ -2509,6 +2515,10 @@ def _test() -> None:
 
     globs = pyspark.sql.dataframe.__dict__.copy()
 
+    # `zip` is not yet supported on Spark Connect; the parent docstring's
+    # example would call into the connect impl and fail with NOT_IMPLEMENTED.
+    del pyspark.sql.dataframe.DataFrame.zip.__doc__
+
     if not is_remote_only():
         del pyspark.sql.dataframe.DataFrame.toJSON.__doc__
         del pyspark.sql.dataframe.DataFrame.rdd.__doc__
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index f96b614624b7..6c4d32ea1797 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2586,6 +2586,49 @@ class DataFrame:
         """
         ...
 
+    @dispatch_df_method
+    def zip(self, other: "DataFrame") -> "DataFrame":
+        """Combines the columns of this :class:`DataFrame` with another 
:class:`DataFrame`
+        side-by-side, preserving row alignment between the two inputs.
+
+        Both DataFrames must produce the same canonicalized plan after 
stripping outer
+        ``Project`` chains. In practice this means they derive from a common 
source through
+        chains of projection-only operations (:meth:`select`, 
:meth:`withColumn`,
+        :meth:`withColumnRenamed`, etc.); the chains may differ between the 
two sides, but
+        anything below them, including any :meth:`filter`, :meth:`orderBy`, 
:meth:`join`,
+        or aggregation, must be identical on both sides so the two sides stay 
row-aligned.
+        Non-scalar Python UDFs (e.g., ``GROUPED_MAP``) are not allowed on 
either side. An
+        :class:`AnalysisException` is thrown when the two DataFrames cannot be 
aligned.
+
+        .. versionadded:: 4.3.0
+
+        Parameters
+        ----------
+        other : :class:`DataFrame`
+            The DataFrame to combine with, which must derive from the same 
source as this
+            DataFrame.
+
+        Returns
+        -------
+        :class:`DataFrame`
+            A new DataFrame containing the columns of this DataFrame followed 
by the columns
+            of `other`.
+
+        Examples
+        --------
+        >>> df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["a", "b", "c"])
+        >>> left = df.select("a")
+        >>> right = df.select("b")
+        >>> left.zip(right).show()
+        +---+---+
+        |  a|  b|
+        +---+---+
+        |  1|  2|
+        |  4|  5|
+        +---+---+
+        """
+        ...
+
     @dispatch_df_method
     def join(
         self,
diff --git a/python/pyspark/sql/tests/connect/test_parity_zip.py 
b/python/pyspark/sql/tests/connect/test_parity_zip.py
new file mode 100644
index 000000000000..b0564e63a48b
--- /dev/null
+++ b/python/pyspark/sql/tests/connect/test_parity_zip.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.errors import PySparkNotImplementedError
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class ZipParityTests(ReusedConnectTestCase):
+    """`DataFrame.zip` is classic-only for now; assert the Connect stub raises 
a clean
+    NOT_IMPLEMENTED instead of falling through to a generic error or appearing 
to work."""
+
+    def test_zip_raises_not_implemented(self):
+        df = self.spark.createDataFrame([(1, 2)], ["a", "b"])
+        with self.assertRaises(PySparkNotImplementedError) as ctx:
+            df.select("a").zip(df.select("b"))
+        self.assertEqual(ctx.exception.getCondition(), "NOT_IMPLEMENTED")
+        self.assertIn("zip", str(ctx.exception))
+
+
+if __name__ == "__main__":
+    from pyspark.testing import main
+
+    main()
diff --git a/python/pyspark/sql/tests/test_zip.py 
b/python/pyspark/sql/tests/test_zip.py
new file mode 100644
index 000000000000..33984657904e
--- /dev/null
+++ b/python/pyspark/sql/tests/test_zip.py
@@ -0,0 +1,185 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.errors import AnalysisException
+from pyspark.sql import Row
+from pyspark.sql import functions as sf
+from pyspark.sql.types import IntegerType
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+from pyspark.testing.utils import (
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+
+
+class DataFrameZipTestsMixin:
+    """Tests for DataFrame.zip(). Currently only the classic path is supported;
+    Spark Connect raises ``NOT_IMPLEMENTED``."""
+
+    def test_zip_select_different_columns(self):
+        df = self.spark.createDataFrame([(1, 2, 3), (4, 5, 6), (7, 8, 9)], 
["a", "b", "c"])
+        zipped = df.select("a").zip(df.select("b"))
+        self.assertEqual(zipped.columns, ["a", "b"])
+        self.assertEqual(
+            sorted(zipped.collect()),
+            [Row(a=1, b=2), Row(a=4, b=5), Row(a=7, b=8)],
+        )
+
+    def test_zip_with_expressions(self):
+        df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a", 
"b"])
+        left = df.select((sf.col("a") + 1).alias("a_plus_1"))
+        right = df.select((sf.col("b") * 2).alias("b_times_2"))
+        self.assertEqual(
+            sorted(left.zip(right).collect()),
+            [
+                Row(a_plus_1=2, b_times_2=20),
+                Row(a_plus_1=3, b_times_2=40),
+                Row(a_plus_1=4, b_times_2=60),
+            ],
+        )
+
+    def test_zip_one_side_is_base(self):
+        df = self.spark.createDataFrame([(1, 2), (3, 4)], ["a", "b"])
+        right = df.select((sf.col("a") + sf.col("b")).alias("sum"))
+        self.assertEqual(
+            sorted(df.zip(right).collect()),
+            [Row(a=1, b=2, sum=3), Row(a=3, b=4, sum=7)],
+        )
+
+    def test_zip_with_python_udf(self):
+        df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a", 
"b"])
+        plus_one = sf.udf(lambda x: x + 1, IntegerType())
+        left = df.select(plus_one(sf.col("a")).alias("a_plus_1"))
+        right = df.select(plus_one(sf.col("b")).alias("b_plus_1"))
+        self.assertEqual(
+            sorted(left.zip(right).collect()),
+            [
+                Row(a_plus_1=2, b_plus_1=11),
+                Row(a_plus_1=3, b_plus_1=21),
+                Row(a_plus_1=4, b_plus_1=31),
+            ],
+        )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        pandas_requirement_message or pyarrow_requirement_message,
+    )
+    def test_zip_with_pandas_udf(self):
+        import pandas as pd
+
+        @sf.pandas_udf(IntegerType())
+        def plus_one(s: pd.Series) -> pd.Series:
+            return s + 1
+
+        df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a", 
"b"])
+        left = df.select(plus_one(sf.col("a")).alias("a_plus_1"))
+        right = df.select(plus_one(sf.col("b")).alias("b_plus_1"))
+        self.assertEqual(
+            sorted(left.zip(right).collect()),
+            [
+                Row(a_plus_1=2, b_plus_1=11),
+                Row(a_plus_1=3, b_plus_1=21),
+                Row(a_plus_1=4, b_plus_1=31),
+            ],
+        )
+
+    def test_zip_different_bases_throws(self):
+        df1 = self.spark.createDataFrame([(1, 2)], ["a", "b"])
+        df2 = self.spark.createDataFrame([(3, 4, 5)], ["x", "y", "z"])
+        with self.assertRaises(AnalysisException) as ctx:
+            df1.select("a").zip(df2.select("x")).schema
+        self.assertEqual(ctx.exception.getCondition(), 
"ZIP_PLANS_NOT_MERGEABLE")
+
+    def test_zip_different_range_bases_throws(self):
+        df1 = self.spark.range(10).toDF("id1")
+        df2 = self.spark.range(20).toDF("id2")
+        with self.assertRaises(AnalysisException) as ctx:
+            df1.zip(df2).schema
+        self.assertEqual(ctx.exception.getCondition(), 
"ZIP_PLANS_NOT_MERGEABLE")
+
+    def test_zip_with_withColumn(self):
+        df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a", 
"b"])
+        left = df.withColumn("a_plus_1", sf.col("a") + 1)
+        right = df.withColumn("b_times_2", sf.col("b") * 2)
+        zipped = left.zip(right)
+        # Schema has duplicates (a, b appear twice) since withColumn keeps 
original columns.
+        self.assertEqual(zipped.columns, ["a", "b", "a_plus_1", "a", "b", 
"b_times_2"])
+        rows = sorted(zipped.collect(), key=lambda r: r[0])
+        self.assertEqual(
+            [tuple(r) for r in rows],
+            [(1, 10, 2, 1, 10, 20), (2, 20, 3, 2, 20, 40), (3, 30, 4, 3, 30, 
60)],
+        )
+
+    def test_zip_with_withColumnRenamed(self):
+        df = self.spark.createDataFrame([(1, 2), (3, 4)], ["a", "b"])
+        left = df.withColumnRenamed("a", "a1")
+        right = df.withColumnRenamed("b", "b1")
+        self.assertEqual(
+            sorted(left.zip(right).collect()),
+            [Row(a1=1, b=2, a=1, b1=2), Row(a1=3, b=4, a=3, b1=4)],
+        )
+
+    def test_zip_chained_withColumn(self):
+        # Stack two withColumn calls on left (two Project layers) and one on 
right.
+        df = self.spark.createDataFrame([(1, 10), (2, 20)], ["a", "b"])
+        left = df.withColumn("a_plus_1", sf.col("a") + 
1).withColumn("a_plus_2", sf.col("a") + 2)
+        right = df.withColumn("b_times_2", sf.col("b") * 2)
+        zipped = left.zip(right)
+        self.assertEqual(
+            zipped.columns,
+            ["a", "b", "a_plus_1", "a_plus_2", "a", "b", "b_times_2"],
+        )
+        rows = sorted(zipped.collect(), key=lambda r: r[0])
+        self.assertEqual(
+            [tuple(r) for r in rows],
+            [(1, 10, 2, 3, 1, 10, 20), (2, 20, 3, 4, 2, 20, 40)],
+        )
+
+    def test_zip_longer_chain(self):
+        # Left has three nested Projects; right has one.
+        df = self.spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["a", "b", 
"c"])
+        left = df.select("a", "b", "c").select("a", "b").select("a")
+        right = df.select("c")
+        self.assertEqual(
+            sorted(left.zip(right).collect()),
+            [Row(a=1, c=3), Row(a=4, c=6)],
+        )
+
+    def test_zip_parent_with_chained_child(self):
+        # df.zip(<chained projection of df>) -- the parent has no Project, 
child has many.
+        df = self.spark.createDataFrame([(1, 2), (3, 4)], ["a", "b"])
+        child = df.select((sf.col("a") + 1).alias("a_plus_1")).select(
+            (sf.col("a_plus_1") * 2).alias("doubled")
+        )
+        self.assertEqual(
+            sorted(df.zip(child).collect()),
+            [Row(a=1, b=2, doubled=4), Row(a=3, b=4, doubled=8)],
+        )
+
+
+class DataFrameZipTests(DataFrameZipTestsMixin, ReusedSQLTestCase):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.testing import main
+
+    main()
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
index 38765262e1fc..0a0b141e9e83 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -819,6 +819,28 @@ abstract class Dataset[T] extends Serializable {
    */
   def crossJoin(right: Dataset[_]): DataFrame
 
+  /**
+   * Combines the columns of this DataFrame with another DataFrame 
side-by-side, preserving row
+   * alignment between the two inputs.
+   *
+   * Both DataFrames must produce the same canonicalized plan after stripping 
outer `Project`
+   * chains. In practice this means they derive from a common source through 
chains of
+   * projection-only operations (`select`, `withColumn`, `withColumnRenamed`, 
etc.); the chains
+   * may differ between the two sides, but anything below them -- including 
any `filter`,
+   * `orderBy`, `join`, or aggregation -- must be identical on both sides so 
the two sides stay
+   * row-aligned. Non-scalar Python UDFs (e.g., `GROUPED_MAP`) are not allowed 
on either side. An
+   * `AnalysisException` is thrown when the two DataFrames cannot be aligned.
+   *
+   * @param other
+   *   The DataFrame to combine with, which must derive from the same source 
as this DataFrame.
+   * @return
+   *   A new DataFrame containing the columns of this DataFrame followed by 
the columns of
+   *   `other`.
+   * @group untypedrel
+   * @since 4.3.0
+   */
+  def zip(other: Dataset[_]): DataFrame
+
   /**
    * Joins this Dataset returning a `Tuple2` for each pair where `condition` 
evaluates to true.
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b47c0031ddd4..b4b25d7c048f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -563,6 +563,7 @@ class Analyzer(
       ResolveBinaryArithmetic ::
       new ResolveIdentifierClause(earlyBatches) ::
       ResolveUnion ::
+      ResolveZip ::
       FlattenSequentialStreamingUnion ::
       ValidateSequentialStreamingUnion ::
       ResolveRowLevelCommandAssignments ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 4e07280f94c9..c63fa6398cbd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -701,6 +701,15 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
                 "expression" -> toSQLExpr(rankingExpression),
                 "type" -> toSQLType(rankingExpression.dataType)))
 
+          case z: Zip =>
+            // ResolveZip succeeded for all valid inputs, so a surviving Zip 
means the two
+            // sides either don't share a base or contain a non-scalar Python 
UDF. Either way
+            // we surface ZIP_PLANS_NOT_MERGEABLE -- without this we'd fall 
through to the
+            // generic unresolved-operator INTERNAL_ERROR catch-all.
+            z.failAnalysis(
+              errorClass = "ZIP_PLANS_NOT_MERGEABLE",
+              messageParameters = Map.empty)
+
           case a: Aggregate =>
             a.groupingExpressions.foreach(
               expression =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala
new file mode 100644
index 000000000000..7b6ec5b909cd
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveZip.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, Expression, ExprId, NamedExpression, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Zip}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.ZIP
+
+/**
+ * Resolves a [[Zip]] node by rewriting it into a chain of [[Project]] nodes 
over the shared
+ * base plan.
+ *
+ * The two children of `Zip` must produce the same canonicalized plan after 
stripping outer
+ * `Project` chains, and the chains themselves must contain only scalar 
expressions
+ * (`Project.resolved` already rejects Generator, AggregateExpression, and 
WindowExpression;
+ * this rule additionally rejects non-scalar Python UDFs that break the 1:1 
row mapping).
+ *
+ * The rewrite collects every alias introduced by either chain, deduplicates 
aliases that
+ * share the same canonicalized child (a shared parent that feeds both sides 
is re-instanced
+ * by the analyzer, so its producer surfaces twice), groups them by dependency 
depth (depth 1
+ * = references only base attributes; depth k = references at least one 
depth-(k-1) alias), and
+ * emits one `Project` layer per depth so each user-written alias stays in its 
own `Alias`.
+ * `CollapseProject` runs later with its existing safety guards 
(`canCollapseExpressions`), so
+ * nondeterministic producers (`rand()`, `uuid()`) and expensive producers 
referenced more than
+ * once stay separate -- avoiding the double evaluation that an unguarded 
inline would cause.
+ *
+ * If the two sides cannot be merged, the `Zip` node remains unresolved and 
`CheckAnalysis`
+ * reports a `ZIP_PLANS_NOT_MERGEABLE` error.
+ */
+object ResolveZip extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+    _.containsPattern(ZIP), ruleId) {
+    case z: Zip if z.childrenResolved => tryMerge(z).getOrElse(z)
+  }
+
+  private def tryMerge(z: Zip): Option[LogicalPlan] = {
+    val (leftAliases, leftTopList, leftBase) = analyzeChain(z.left)
+    val (rightAliases, rightTopList, rightBase) = analyzeChain(z.right)
+
+    if (!leftBase.sameResult(rightBase)) return None
+    if (!allScalar(leftAliases ++ rightAliases)) return None
+
+    // Right base's attributes may have different exprIds than the left base's 
even when the
+    // two bases are `sameResult`. Map positionally so right-side references 
resolve against
+    // the left base in the merged plan.
+    val attrMapping: AttributeMap[Attribute] =
+      AttributeMap(rightBase.output.zip(leftBase.output))
+    val remappedRightAliases = rightAliases.map(remapAlias(_, attrMapping))
+    val remappedRightTopList = rightTopList.map(remapNamedExpr(_, attrMapping))
+
+    // When both sides walk through a shared parent `Project`, the analyzer 
re-instances the
+    // right side, so the shared producer surfaces as two aliases with 
different exprIds but the
+    // same child expression (e.g. two `rand(sameSeed)`). Keeping both would 
evaluate the producer
+    // twice per row. Deduplicate by canonicalized child: keep the first alias 
for each distinct
+    // child, and remap references to the dropped aliases (in surviving alias 
bodies and in the
+    // output lists) to the survivor. A freshly written producer gets a 
distinct seed, so its
+    // canonical form differs and it is never merged with an unrelated one.
+    val canonToKept = mutable.LinkedHashMap.empty[Expression, Alias]
+    val droppedToSurvivor = mutable.HashMap.empty[ExprId, Attribute]
+    (leftAliases ++ remappedRightAliases).foreach { a =>
+      canonToKept.get(a.child.canonicalized) match {
+        case Some(kept) => droppedToSurvivor(a.exprId) = kept.toAttribute
+        case None => canonToKept(a.child.canonicalized) = a
+      }
+    }
+
+    def remapDropped[E <: Expression](e: E): E = if 
(droppedToSurvivor.isEmpty) {
+      e
+    } else {
+      e.transform { case a: Attribute => droppedToSurvivor.getOrElse(a.exprId, 
a) }
+        .asInstanceOf[E]
+    }
+
+    // Rewrite surviving alias bodies so references to dropped aliases point 
at the survivors.
+    val dedupedAliases = canonToKept.values.toSeq.map { a =>
+      remapAliasChild(a, remapDropped(a.child))
+    }
+    val layered = buildLayeredChain(dedupedAliases, leftBase)
+    // Build the top-level output list. For each top-level expression:
+    //   - If it references a dropped alias, use the survivor attribute -- but 
re-alias it to the
+    //     dropped column's own name, exprId, and metadata so the schema stays 
correct. Two sides
+    //     may expose the same deterministic producer under different names 
(e.g.
+    //     df.select($"a".as("x")).zip(df.select($"a".as("y")))); dedup keeps 
the survivor's
+    //     identity only for internal references, not for user-visible output 
columns. Only the
+    //     producer (value) is shared; the dropped column keeps its own name 
and metadata.
+    //   - Otherwise pass the attribute through unchanged.
+    val finalProjectList: Seq[NamedExpression] =
+      (leftTopList ++ remappedRightTopList).map { ne =>
+        val attr = ne.toAttribute
+        droppedToSurvivor.get(attr.exprId) match {
+          case Some(survivorAttr) =>
+            // Force explicitMetadata so the output column keeps the dropped 
column's own metadata
+            // even though its value now comes from the survivor attribute.
+            Alias(survivorAttr, attr.name)(
+              exprId = attr.exprId, explicitMetadata = Some(attr.metadata))
+          case None => remapDropped(attr)
+        }
+      }
+    Some(Project(finalProjectList, layered))
+  }
+
+  /**
+   * Walks a chain of `Project` nodes and returns:
+   *   - every `Alias` introduced anywhere in the chain (deepest first, then 
outward),
+   *   - the topmost `Project`'s projection list (or the plan's output if 
there is no top
+   *     `Project`), used to drive the final output column list, and
+   *   - the chain's base plan (first non-`Project` node).
+   */
+  private def analyzeChain(
+      plan: LogicalPlan): (Seq[Alias], Seq[NamedExpression], LogicalPlan) = 
plan match {
+    case Project(exprs, child) =>
+      val (childAliases, _, base) = analyzeChain(child)
+      val newAliases = exprs.collect { case a: Alias => a }
+      (childAliases ++ newAliases, exprs, base)
+    case other =>
+      (Seq.empty, other.output, other)
+  }
+
+  /** Rewrites a single `Alias` so its body references the left base's 
attributes. */
+  private def remapAlias(a: Alias, attrMapping: AttributeMap[Attribute]): 
Alias = {
+    val newChild = a.child.transform {
+      case attr: Attribute => attrMapping.getOrElse(attr, attr)
+    }
+    remapAliasChild(a, newChild)
+  }
+
+  /** Returns a copy of `a` with `newChild` as its body, preserving name, 
exprId, and metadata. */
+  private def remapAliasChild(a: Alias, newChild: Expression): Alias = {
+    Alias(newChild, a.name)(
+      exprId = a.exprId,
+      qualifier = a.qualifier,
+      explicitMetadata = a.explicitMetadata,
+      nonInheritableMetadataKeys = a.nonInheritableMetadataKeys)
+  }
+
+  private def remapNamedExpr(
+      ne: NamedExpression, attrMapping: AttributeMap[Attribute]): 
NamedExpression = ne match {
+    case a: Alias => remapAlias(a, attrMapping)
+    case attr: Attribute => attrMapping.getOrElse(attr, attr)
+    case other =>
+      other.transform { case attr: Attribute => attrMapping.getOrElse(attr, 
attr) }
+        .asInstanceOf[NamedExpression]
+  }
+
+  /**
+   * Builds a chain of `Project`s over `base`, with one layer per dependency 
depth so each
+   * user-written alias stays in its own `Alias`. Each layer carries every 
attribute from the
+   * previous layer (full passthrough) so deeper layers and the top can still 
reference
+   * earlier columns; `ColumnPruning` removes the unused passthroughs in the 
optimizer.
+   */
+  private def buildLayeredChain(aliases: Seq[Alias], base: LogicalPlan): 
LogicalPlan = {
+    if (aliases.isEmpty) return base
+
+    val aliasByExprId: Map[ExprId, Alias] = aliases.map(a => a.exprId -> 
a).toMap
+    val depthCache = mutable.Map.empty[ExprId, Int]
+    def depthOf(exprId: ExprId): Int = depthCache.getOrElseUpdate(exprId, {
+      val alias = aliasByExprId(exprId)
+      val refDepths = alias.child.collect { case a: Attribute => a }
+        .flatMap(r => aliasByExprId.get(r.exprId).map(_ => depthOf(r.exprId)))
+      if (refDepths.isEmpty) 1 else refDepths.max + 1
+    })
+    aliases.foreach(a => depthOf(a.exprId))
+    val byDepth = aliases.groupBy(a => depthCache(a.exprId)).toSeq.sortBy(_._1)
+
+    byDepth.foldLeft[LogicalPlan](base) { case (acc, (_, depthAliases)) =>
+      Project(acc.output ++ depthAliases, acc)
+    }
+  }
+
+  /**
+   * Returns true if no alias contains a non-scalar Python UDF. 
`Project.resolved` already
+   * rejects Generator, AggregateExpression, and WindowExpression; this 
additionally rejects
+   * non-scalar Python UDFs (e.g. `GROUPED_MAP`) that would break the 1:1 row 
mapping.
+   */
+  private def allScalar(aliases: Seq[Alias]): Boolean = {
+    !aliases.exists(_.exists {
+      case udf: PythonUDF => !PythonUDF.isScalarPythonUDF(udf)
+      case _ => false
+    })
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 9184c5ef412b..2fd41281b0b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -834,6 +834,30 @@ case class Join(
   override def isStateful: Boolean = left.isStreaming && right.isStreaming
 }
 
+/**
+ * A logical plan that combines the columns of two DataFrames that derive from 
the same
+ * base plan through chains of Project nodes. This node is always unresolved 
and must be
+ * rewritten by [[ResolveZip]] into a chain of Project nodes over the shared 
base plan
+ * during analysis. If the two children do not share the same base plan (after 
stripping
+ * outer Projects), or if either side contains a non-scalar Python UDF, 
analysis will fail
+ * with ZIP_PLANS_NOT_MERGEABLE.
+ */
+case class Zip(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+  override def output: Seq[Attribute] = left.output ++ right.output
+
+  override def maxRows: Option[Long] = left.maxRows
+
+  override def maxRowsPerPartition: Option[Long] = left.maxRowsPerPartition
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(ZIP)
+
+  // Always unresolved -- must be rewritten by ResolveZip during analysis.
+  override lazy val resolved: Boolean = false
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): Zip = copy(left = newLeft, 
right = newRight)
+}
+
 /**
  * Insert query result into a directory.
  *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index fb6254d82056..36fdd4706016 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -108,6 +108,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.ResolveUnresolvedHaving" ::
       
"org.apache.spark.sql.catalyst.analysis.ResolveUpdateEventTimeWatermarkColumn" 
::
       "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
+      "org.apache.spark.sql.catalyst.analysis.ResolveZip" ::
       "org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
       "org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
       "org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 557b01167d88..67016e586a5a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -190,6 +190,7 @@ object TreePattern extends Enumeration  {
   val WINDOW: Value = Value
   val WINDOW_GROUP_LIMIT: Value = Value
   val WITH_WINDOW_DEFINITION: Value = Value
+  val ZIP: Value = Value
 
   // Unresolved Plan patterns (Alphabetically ordered)
   val NAMED_STREAMING_RELATION: Value = Value
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala
new file mode 100644
index 000000000000..d297f29ea8d9
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveZipSuite.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.api.python.PythonEvalType
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.IntegerType
+
+class ResolveZipSuite extends AnalysisTest {
+
+  private val base = LocalRelation($"a".int, $"b".int, $"c".int)
+
+  object Resolve extends RuleExecutor[LogicalPlan] {
+    override val batches: Seq[Batch] = Seq(
+      Batch("ResolveZip", Once, ResolveZip))
+  }
+
+  private def reachesBase(plan: LogicalPlan, expectedBase: LogicalPlan): 
Boolean = plan match {
+    case Project(_, child) => reachesBase(child, expectedBase)
+    case other => other eq expectedBase
+  }
+
+  test("resolve Zip: both sides have Project over same base") {
+    val left = Project(Seq(base.output(0)), base)
+    val right = Project(Seq(base.output(1)), base)
+    val zip = Zip(left, right)
+
+    val resolved = Resolve.execute(zip)
+    val expected = Project(Seq(base.output(0), base.output(1)), base)
+    comparePlans(resolved, expected)
+  }
+
+  test("resolve Zip: left is bare plan, right has Project") {
+    val right = Project(Seq(base.output(0)), base)
+    val zip = Zip(base, right)
+
+    val resolved = Resolve.execute(zip)
+    val expected = Project(base.output ++ Seq(base.output(0)), base)
+    comparePlans(resolved, expected)
+  }
+
+  test("resolve Zip: both sides are bare same plan") {
+    val zip = Zip(base, base)
+
+    val resolved = Resolve.execute(zip)
+    val expected = Project(base.output ++ base.output, base)
+    comparePlans(resolved, expected)
+  }
+
+  test("resolve Zip: both sides have expressions over same base") {
+    val left = base.select(($"a" + 1).as("a_plus_1"))
+    val right = base.select(($"b" * 2).as("b_times_2"))
+    val zip = Zip(left.analyze, right.analyze)
+
+    val resolved = Resolve.execute(zip)
+    assert(!resolved.isInstanceOf[Zip], "Zip should have been resolved to a 
Project")
+    assert(resolved.isInstanceOf[Project])
+    assert(resolved.output.length == 2)
+    assert(resolved.output(0).name == "a_plus_1")
+    assert(resolved.output(1).name == "b_times_2")
+  }
+
+  test("resolve Zip: different base plans - Zip remains unresolved") {
+    val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int)
+    val left = Project(Seq(base.output(0)), base)
+    val right = Project(Seq(base2.output(0)), base2)
+    val zip = Zip(left, right)
+
+    val resolved = Resolve.execute(zip)
+    // ResolveZip cannot merge, so Zip stays
+    assert(resolved.isInstanceOf[Zip])
+  }
+
+  test("resolve Zip: skipped when children are unresolved") {
+    val unresolvedChild = Project(
+      Seq(UnresolvedAttribute("a")),
+      UnresolvedRelation(Seq("t")))
+    val zip = Zip(unresolvedChild, unresolvedChild)
+
+    val result = Resolve.execute(zip)
+    // Zip should remain unchanged because children are not resolved
+    assert(result.isInstanceOf[Zip])
+  }
+
+  test("CheckAnalysis: different base plans throws ZIP_PLANS_NOT_MERGEABLE") {
+    val base2 = LocalRelation($"x".int, $"y".int, $"z".int, $"w".int)
+    val left = Project(Seq(base.output(0)), base)
+    val right = Project(Seq(base2.output(0)), base2)
+    val zip = Zip(left, right)
+
+    assertAnalysisErrorCondition(
+      zip,
+      expectedErrorCondition = "ZIP_PLANS_NOT_MERGEABLE",
+      expectedMessageParameters = Map.empty
+    )
+  }
+
+  test("resolve Zip: longer chain of selects on both sides") {
+    // Left has 3 nested Projects, right has 1 Project. Both reach the same 
base.
+    val left = Project(Seq(base.output(0)),
+      Project(Seq(base.output(0), base.output(1)),
+        Project(base.output, base)))
+    val right = Project(Seq(base.output(1)), base)
+    val zip = Zip(left, right)
+
+    val resolved = Resolve.execute(zip)
+    assert(resolved.isInstanceOf[Project], "Asymmetric chain should still 
merge to a Project")
+    assert(resolved.output.map(_.name) == Seq("a", "b"))
+  }
+
+  test("resolve Zip: chained Project with aliases composes substitutions") {
+    // Build df.select(a + 1 AS x).select(x * 2 AS y) -- outer references the 
inner alias.
+    val inner = base.select(($"a" + 1).as("x"))
+    val outer = inner.select(($"x" * 2).as("y")).analyze
+    val right = base.select(($"b" * 3).as("z")).analyze
+    val zip = Zip(outer, right)
+
+    val resolved = Resolve.execute(zip)
+    assert(resolved.isInstanceOf[Project], "Aliased chain should still merge 
to a Project")
+    assert(reachesBase(resolved, base),
+      "Resolved plan should be a Project chain rooted at the shared base")
+    assert(resolved.output.map(_.name) == Seq("y", "z"))
+  }
+
+  test("resolve Zip: different-instance bases with same canonical plan") {
+    // Two LocalRelations with the same schema but distinct exprIds. 
`sameResult` matches
+    // (canonicalized plans are equal), so this is the only path where 
`attrMapping` actually
+    // remaps right-side references.
+    val baseB = LocalRelation($"a".int, $"b".int, $"c".int)
+    val left = Project(Seq(base.output(0)), base)
+    val right = baseB.select(($"a" + 1).as("a_plus_1")).analyze
+    val zip = Zip(left, right)
+
+    val resolved = Resolve.execute(zip)
+    assert(resolved.isInstanceOf[Project])
+    assert(reachesBase(resolved, base),
+      "Resolved plan should be rooted at the left base, not the right base")
+    assert(!resolved.exists(_ eq baseB), "Right base should be discarded after 
merge")
+    assert(resolved.output.map(_.name) == Seq("a", "a_plus_1"))
+  }
+
+  test("CheckAnalysis: non-scalar Python UDF throws ZIP_PLANS_NOT_MERGEABLE") {
+    // A GROUPED_MAP Python UDF in either side's projection breaks the 1:1 row 
mapping, so
+    // ResolveZip refuses to merge and the surviving Zip must surface 
ZIP_PLANS_NOT_MERGEABLE
+    // (rather than fall through to the generic unresolved-operator 
INTERNAL_ERROR).
+    val groupedMapUdf = PythonUDF(
+      "pyUDF",
+      null,
+      IntegerType,
+      Seq(base.output(0)),
+      PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+      udfDeterministic = true)
+    val left = base.select(groupedMapUdf.as("x"))
+    val right = base.select($"b".as("y"))
+    val zip = Zip(left.analyze, right.analyze)
+
+    assertAnalysisErrorCondition(
+      zip,
+      expectedErrorCondition = "ZIP_PLANS_NOT_MERGEABLE",
+      expectedMessageParameters = Map.empty
+    )
+  }
+
+  test("resolve Zip: stacked withColumn-style projections (multiple Project 
layers)") {
+    // Emulate df.withColumn("d", a + 1).withColumn("e", b * 2) on left:
+    // two passthrough-plus-alias Projects stacked, while right has a single 
layer.
+    val left = base
+      .select($"a", $"b", $"c", ($"a" + 1).as("d"))
+      .select($"a", $"b", $"c", $"d", ($"b" * 2).as("e"))
+      .analyze
+    val right = base.select($"a", $"b", $"c", ($"c" + 100).as("f")).analyze
+    val zip = Zip(left, right)
+
+    val resolved = Resolve.execute(zip)
+    assert(resolved.isInstanceOf[Project], "Stacked withColumn chain should 
merge to a Project")
+    assert(reachesBase(resolved, base),
+      "Resolved plan should be a Project chain rooted at the shared base")
+    assert(resolved.output.map(_.name) == Seq("a", "b", "c", "d", "e", "a", 
"b", "c", "f"))
+  }
+
+  test("resolve Zip: shared-producer dedup preserves each side's output column 
name") {
+    // Both sides project the same deterministic expression over the shared 
base, but under
+    // different user-given names. The dedup must merge the producer but keep 
each side's name.
+    val left = base.select($"a".as("x")).analyze
+    val right = base.select($"a".as("y")).analyze
+    val zip = Zip(left, right)
+
+    val resolved = Resolve.execute(zip)
+    assert(resolved.isInstanceOf[Project])
+    assert(resolved.output.map(_.name) == Seq("x", "y"),
+      s"schema should be [x, y] but got ${resolved.output.map(_.name)}")
+  }
+}
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
index 34c685213711..3f7ed1a7c287 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Dataset.scala
@@ -346,6 +346,11 @@ class Dataset[T] private[sql] (
     builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS)
   }
 
+  /** @inheritdoc */
+  def zip(other: sql.Dataset[_]): DataFrame = {
+    throw new UnsupportedOperationException("zip is not supported in Spark 
Connect")
+  }
+
   /** @inheritdoc */
   def joinWith[U](other: sql.Dataset[U], condition: Column, joinType: String): 
Dataset[(T, U)] = {
     val joinTypeValue = toJoinType(joinType, skipSemiAnti = true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
index 833b3f451273..6596fcbe091c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
@@ -706,6 +706,11 @@ class Dataset[T] private[sql](
     Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE)
   }
 
+  /** @inheritdoc */
+  def zip(other: sql.Dataset[_]): DataFrame = withPlan {
+    Zip(logicalPlan, other.logicalPlan)
+  }
+
   /** @inheritdoc */
   def joinWith[U](other: sql.Dataset[U], condition: Column, joinType: String): 
Dataset[(T, U)] = {
     // Creates a Join node and resolve it first, to get join condition 
resolved, self-join resolved,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala
new file mode 100644
index 000000000000..40720d3ce5c3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameZipSuite.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.Rand
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.functions.{lit, rand, uniform}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.MetadataBuilder
+
+class DataFrameZipSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  test("zip: select different columns from the same DataFrame") {
+    val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("a", "b", "c")
+    val left = df.select("a")
+    val right = df.select("b")
+
+    checkAnswer(
+      left.zip(right),
+      Row(1, 2) :: Row(4, 5) :: Row(7, 8) :: Nil)
+  }
+
+  test("zip: select with expressions over the same DataFrame") {
+    val df = Seq((1, 10), (2, 20), (3, 30)).toDF("a", "b")
+    val left = df.select(($"a" + 1).as("a_plus_1"))
+    val right = df.select(($"b" * 2).as("b_times_2"))
+
+    checkAnswer(
+      left.zip(right),
+      Row(2, 20) :: Row(3, 40) :: Row(4, 60) :: Nil)
+  }
+
+  test("zip: one side selects all columns") {
+    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+    val right = df.select(($"a" + $"b").as("sum"))
+
+    checkAnswer(
+      df.zip(right),
+      Row(1, 2, 3) :: Row(3, 4, 7) :: Nil)
+  }
+
+  test("zip: resolved plan is a Project") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val left = df.select("a")
+    val right = df.select("b")
+    val zipped = left.zip(right)
+
+    assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+  }
+
+  test("zip: different base plans throws AnalysisException") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((3, 4, 5)).toDF("x", "y", "z")
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df1.select("a").zip(df2.select("x")).queryExecution.assertAnalyzed()
+      },
+      condition = "ZIP_PLANS_NOT_MERGEABLE"
+    )
+  }
+
+  test("zip: different base plans from spark.range throws AnalysisException") {
+    val df1 = spark.range(10).toDF("id1")
+    val df2 = spark.range(20).toDF("id2")
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        df1.zip(df2).queryExecution.assertAnalyzed()
+      },
+      condition = "ZIP_PLANS_NOT_MERGEABLE"
+    )
+  }
+
+  test("zip: withColumn on both sides") {
+    val df = Seq((1, 10), (2, 20), (3, 30)).toDF("a", "b")
+    val left = df.withColumn("a_plus_1", $"a" + 1)
+    val right = df.withColumn("b_times_2", $"b" * 2)
+    val zipped = left.zip(right)
+
+    assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+    checkAnswer(
+      zipped,
+      Row(1, 10, 2, 1, 10, 20) ::
+        Row(2, 20, 3, 2, 20, 40) ::
+        Row(3, 30, 4, 3, 30, 60) :: Nil)
+  }
+
+  test("zip: chained withColumn (multiple Project layers on the same side)") {
+    val df = Seq((1, 10), (2, 20)).toDF("a", "b")
+    val left = df
+      .withColumn("a_plus_1", $"a" + 1)
+      .withColumn("a_plus_2", $"a" + 2)
+    val right = df.withColumn("b_times_2", $"b" * 2)
+    val zipped = left.zip(right)
+
+    assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+    checkAnswer(
+      zipped,
+      Row(1, 10, 2, 3, 1, 10, 20) ::
+        Row(2, 20, 3, 4, 2, 20, 40) :: Nil)
+  }
+
+  test("zip: longer chain of selects on both sides") {
+    val df = Seq((1, 2, 3), (4, 5, 6)).toDF("a", "b", "c")
+    val left = df.select("a", "b", "c").select("a", "b").select("a")
+    val right = df.select("c")
+    val zipped = left.zip(right)
+
+    assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+    checkAnswer(zipped, Row(1, 3) :: Row(4, 6) :: Nil)
+  }
+
+  test("zip: parent and child with chain") {
+    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+    val child = df.select(($"a" + 1).as("a_plus_1")).select(($"a_plus_1" * 
2).as("doubled"))
+    val zipped = df.zip(child)
+
+    assert(zipped.queryExecution.analyzed.isInstanceOf[Project])
+    checkAnswer(zipped, Row(1, 2, 4) :: Row(3, 4, 8) :: Nil)
+  }
+
+  test("zip: withColumnRenamed on both sides") {
+    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+    val left = df.withColumnRenamed("a", "a1")
+    val right = df.withColumnRenamed("b", "b1")
+
+    checkAnswer(
+      left.zip(right),
+      Row(1, 2, 1, 2) :: Row(3, 4, 3, 4) :: Nil)
+  }
+
+  test("zip: nondeterministic alias used multiple times stays a single 
expression") {
+    // df.withColumn("r", rand()).withColumn("x", r + r) -- if the rewrite 
naively inlined
+    // r into r+r, rand() would be evaluated twice per row and x would no 
longer equal 2*r.
+    // The depth-layered chain keeps each user alias in its own Alias entry, 
and
+    // CollapseProject's canCollapseExpressions guard refuses to inline a 
non-deterministic
+    // producer consumed more than once -- so the optimized plan must contain 
rand() exactly
+    // once.
+    val df = spark.range(10).toDF("id")
+    val left = df.withColumn("r", rand()).withColumn("x", $"r" + 
$"r").select("x")
+    val right = df.select("id")
+
+    val optimized = left.zip(right).queryExecution.optimizedPlan
+    val randCount = optimized.flatMap { p =>
+      p.expressions.flatMap(_.collect { case _: Rand => 1 })
+    }.sum
+    assert(randCount == 1,
+      s"rand() must appear exactly once after the rewrite, got $randCount; 
plan:\n$optimized")
+  }
+
+  test("zip: shared parent alias is not double-evaluated") {
+    // Both sides derive from the same `parent` whose alias `r` is a 
nondeterministic random int.
+    // Each side's chain walk collects `r`, so without de-duplication the 
layered chain would emit
+    // `r` twice and evaluate the producer once per copy. Assert the optimized 
plan keeps a single
+    // Rand.
+    val df = spark.range(10).toDF("id")
+    // `uniform(0, 1000000)` (no explicit seed) is a nondeterministic random 
int producer.
+    val parent = df.withColumn("r", uniform(lit(0), lit(1000000)))
+    val left = parent.select("r")
+    val right = parent.withColumn("x", $"r" + $"r").select("x")
+    val zipped = left.zip(right)
+
+    // Structural guard against duplication: the producer (a Rand inside 
uniform's replacement)
+    // must appear exactly once in the optimized plan.
+    val optimized = zipped.queryExecution.optimizedPlan
+    val randCount = optimized.flatMap { p =>
+      p.expressions.flatMap(_.collect { case _: Rand => 1 })
+    }.sum
+    assert(randCount == 1,
+      s"the random producer must appear exactly once after the rewrite, got 
$randCount; " +
+        s"plan:\n$optimized")
+
+    // Result correctness: the emitted `x` must equal `r + r` for the exact 
`r` emitted in the
+    // output, i.e. the merge wired `x` to the same producer instance that 
feeds the `r` column.
+    // `r` is an int in [0, 1000000) so `r + r` is exact and cannot overflow 
Int. Reduce to a
+    // single boolean via distinct.
+    checkAnswer(zipped.select(($"x" === $"r" + $"r").as("ok")).distinct(), 
Row(true))
+  }
+
+  test("zip: shared-producer dedup preserves output column names") {
+    // When both sides project the same deterministic expression under 
different names, the
+    // dedup merges the producers but must keep each side's user-given output 
name intact.
+    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+    // Same column `a` aliased differently on each side.
+    val zipped = df.select($"a".as("x")).zip(df.select($"a".as("y")))
+    assert(zipped.columns === Array("x", "y"),
+      s"expected schema [x, y] but got ${zipped.columns.mkString("[", ", ", 
"]")}")
+    checkAnswer(zipped, Row(1, 1) :: Row(3, 3) :: Nil)
+
+    // Same expression over shared base, different output names.
+    val zipped2 = df.select(($"a" + 1).as("x")).zip(df.select(($"a" + 
1).as("y")))
+    assert(zipped2.columns === Array("x", "y"),
+      s"expected schema [x, y] but got ${zipped2.columns.mkString("[", ", ", 
"]")}")
+    checkAnswer(zipped2, Row(2, 2) :: Row(4, 4) :: Nil)
+  }
+
+  test("zip: shared-producer dedup preserves each side's output column 
metadata") {
+    // Both sides project the same producer under different names AND 
different metadata. The
+    // dedup shares the producer (value), but each output column must keep its 
own metadata --
+    // the dropped column must not inherit the survivor's.
+    val mx = new MetadataBuilder().putString("desc", "left").build()
+    val my = new MetadataBuilder().putString("desc", "right").build()
+    val df = Seq((1, 2), (3, 4)).toDF("a", "b")
+    val zipped = df.select($"a".as("x", mx)).zip(df.select($"a".as("y", my)))
+    assert(zipped.columns === Array("x", "y"))
+    assert(zipped.schema("x").metadata === mx,
+      s"column x lost its metadata: ${zipped.schema("x").metadata}")
+    assert(zipped.schema("y").metadata === my,
+      s"column y inherited the survivor's metadata: 
${zipped.schema("y").metadata}")
+    checkAnswer(zipped, Row(1, 1) :: Row(3, 3) :: Nil)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to