This is an automated email from the ASF dual-hosted git repository.

HyukjinKwon pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 86052af7039f [SPARK-40336][PS] Implement DataFrameGroupBy.cov
86052af7039f is described below

commit 86052af7039fd15d89d160ed174597cbb367997a
Author: Devin Petersohn <[email protected]>
AuthorDate: Mon May 11 06:54:03 2026 +0900

    [SPARK-40336][PS] Implement DataFrameGroupBy.cov
    
    ### What changes were proposed in this pull request?
    
    Implement `DataFrameGroupBy.cov(min_periods, ddof, numeric_only)` in the 
pandas API on Spark.
    
    ### Why are the changes needed?
    
    Missing API coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    Closes #55775 from devin-petersohn/devin/groupby-cov.
    
    Authored-by: Devin Petersohn <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 7041d1eb72e048545d94b37be05fa6b3c5af8586)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 dev/sparktestsupport/modules.py                    |   2 +
 .../source/reference/pyspark.pandas/groupby.rst    |   1 +
 python/pyspark/pandas/groupby.py                   | 210 +++++++++++++++++++++
 python/pyspark/pandas/missing/groupby.py           |   1 -
 .../tests/connect/groupby/test_parity_cov.py       |  34 ++++
 python/pyspark/pandas/tests/groupby/test_cov.py    | 124 ++++++++++++
 6 files changed, 371 insertions(+), 1 deletion(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 664b0d81840a..693d43f10f57 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1045,6 +1045,7 @@ pyspark_pandas_slow = Module(
         "pyspark.pandas.tests.groupby.test_aggregate",
         "pyspark.pandas.tests.groupby.test_apply_func",
         "pyspark.pandas.tests.groupby.test_corr",
+        "pyspark.pandas.tests.groupby.test_cov",
         "pyspark.pandas.tests.groupby.test_cumulative",
         "pyspark.pandas.tests.groupby.test_describe",
         "pyspark.pandas.tests.groupby.test_groupby",
@@ -1477,6 +1478,7 @@ pyspark_pandas_slow_connect = Module(
         "pyspark.pandas.tests.connect.groupby.test_parity_aggregate",
         "pyspark.pandas.tests.connect.groupby.test_parity_apply_func",
         "pyspark.pandas.tests.connect.groupby.test_parity_corr",
+        "pyspark.pandas.tests.connect.groupby.test_parity_cov",
         "pyspark.pandas.tests.connect.groupby.test_parity_cumulative",
         "pyspark.pandas.tests.connect.groupby.test_parity_missing_data",
         "pyspark.pandas.tests.connect.groupby.test_parity_split_apply",
diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst 
b/python/docs/source/reference/pyspark.pandas/groupby.rst
index f86a7572666b..02b9174ad0ab 100644
--- a/python/docs/source/reference/pyspark.pandas/groupby.rst
+++ b/python/docs/source/reference/pyspark.pandas/groupby.rst
@@ -97,6 +97,7 @@ The following methods are available only for 
`DataFrameGroupBy` objects.
 .. autosummary::
    :toctree: api/
 
+   DataFrameGroupBy.cov
    DataFrameGroupBy.describe
 
 The following methods are available only for `SeriesGroupBy` objects.
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index c5b81f05cc57..2cfa1ae2316b 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -4303,6 +4303,216 @@ class DataFrameGroupBy(GroupBy[DataFrame]):
             )
         )
 
+    @with_ansi_mode_context
+    def cov(
+        self,
+        min_periods: Optional[int] = None,
+        ddof: int = 1,
+        numeric_only: bool = False,
+    ) -> "DataFrame":
+        """
+        Compute pairwise covariance of columns, excluding NA/null values.
+
+        The returned DataFrame is the covariance matrix of the columns
+        of the DataFrame within each group.
+
+        Both NA and null values are automatically excluded from the
+        calculation. A threshold can be set for the minimum number of
+        observations for each value created. Comparisons with observations
+        below this threshold will be returned as ``NaN``.
+
+        .. versionadded:: 4.3.0
+
+        Parameters
+        ----------
+        min_periods : int, optional
+            Minimum number of observations required per pair of columns
+            to have a valid result.
+
+        ddof : int, default 1
+            Delta degrees of freedom. The divisor used in calculations
+            is ``N - ddof``, where ``N`` represents the number of elements.
+
+        numeric_only : bool, default False
+            Include only `float`, `int` or `boolean` data.
+
+        Returns
+        -------
+        DataFrame
+            The covariance matrix of the series of the DataFrame within each 
group.
+
+        See Also
+        --------
+        DataFrame.cov
+        Series.cov
+
+        Examples
+        --------
+        >>> df = ps.DataFrame(
+        ...     {"A": [1, 1, 2, 2, 2], "B": [1, 2, 3, 4, 5], "C": [4, 6, 7, 9, 
11]},
+        ...     columns=["A", "B", "C"])
+        >>> df.groupby("A").cov().sort_index()
+                B    C
+        A
+        1 B  0.5  1.0
+          C  1.0  2.0
+        2 B  1.0  2.0
+          C  2.0  4.0
+
+        >>> df.groupby("A").cov(ddof=0).sort_index()
+                    B         C
+        A
+        1 B  0.250000  0.500000
+          C  0.500000  1.000000
+        2 B  0.666667  1.333333
+          C  1.333333  2.666667
+
+        >>> df.groupby("A").cov(min_periods=3).sort_index()
+                B    C
+        A
+        1 B  NaN  NaN
+          C  NaN  NaN
+        2 B  1.0  2.0
+          C  2.0  4.0
+        """
+        if not isinstance(ddof, int):
+            raise TypeError("ddof must be integer")
+        if LooseVersion(pd.__version__) >= "3.0.0":
+            if not isinstance(numeric_only, bool):
+                raise ValueError("numeric_only accepts only Boolean values")
+        min_periods = 1 if min_periods is None else min_periods
+
+        groupkey_names: List[str] = [str(key.name) for key in self._groupkeys]
+        internal, _, _ = self._prepare_reduce(
+            groupkey_names=groupkey_names,
+            accepted_spark_types=(NumericType, BooleanType) if numeric_only 
else None,
+            bool_to_numeric=False,
+        )
+
+        numeric_labels = [
+            label
+            for label in internal.column_labels
+            if isinstance(internal.spark_type_for(label), (NumericType, 
BooleanType))
+        ]
+        numeric_scols: List[Column] = [
+            internal.spark_column_for(label).cast("double") for label in 
numeric_labels
+        ]
+        numeric_col_names: List[str] = [name_like_string(label) for label in 
numeric_labels]
+        num_scols = len(numeric_scols)
+
+        sdf = internal.spark_frame
+        index_1_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_index_1_temp_column__")
+        index_2_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_index_2_temp_column__")
+        value_1_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_value_1_temp_column__")
+        value_2_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_value_2_temp_column__")
+        cov_output_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_output_temp_column__")
+        count_output_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_count_temp_column__")
+
+        pair_scols: List[Column] = []
+        for i in range(0, num_scols):
+            for j in range(i, num_scols):
+                pair_scols.append(
+                    F.struct(
+                        F.lit(i).alias(index_1_col_name),
+                        F.lit(j).alias(index_2_col_name),
+                        numeric_scols[i].alias(value_1_col_name),
+                        numeric_scols[j].alias(value_2_col_name),
+                    )
+                )
+
+        sdf = sdf.select(*[F.col(key) for key in groupkey_names], 
*[F.inline(F.array(*pair_scols))])
+
+        # Null both values when either is null so pairwise non-null counts are 
accurate.
+        sdf = sdf.select(
+            *[F.col(key) for key in groupkey_names + [index_1_col_name, 
index_2_col_name]],
+            F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name), 
F.lit(None))
+            .otherwise(F.col(value_1_col_name))
+            .alias(value_1_col_name),
+            F.when(F.isnull(value_1_col_name) | F.isnull(value_2_col_name), 
F.lit(None))
+            .otherwise(F.col(value_2_col_name))
+            .alias(value_2_col_name),
+        )
+
+        sdf = sdf.groupby(groupkey_names + [index_1_col_name, 
index_2_col_name]).agg(
+            SF.covar(F.col(value_1_col_name), F.col(value_2_col_name), 
ddof).alias(
+                cov_output_col_name
+            ),
+            F.count(F.when(~F.isnull(value_1_col_name), 
1)).alias(count_output_col_name),
+        )
+
+        sdf = sdf.withColumn(
+            cov_output_col_name,
+            F.when(F.col(count_output_col_name) < min_periods, 
F.lit(None)).otherwise(
+                F.col(cov_output_col_name)
+            ),
+        )
+
+        # Mirror the (i, j) pair to (j, i) to fill in the lower triangle of 
the matrix.
+        auxiliary_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_auxiliary_temp_column__")
+        sdf = sdf.withColumn(
+            auxiliary_col_name,
+            F.explode(
+                F.when(
+                    F.col(index_1_col_name) == F.col(index_2_col_name),
+                    F.lit([0]),
+                ).otherwise(F.lit([0, 1]))
+            ),
+        ).select(
+            *[F.col(key) for key in groupkey_names],
+            *[
+                F.when(F.col(auxiliary_col_name) == 0, F.col(index_1_col_name))
+                .otherwise(F.col(index_2_col_name))
+                .alias(index_1_col_name),
+                F.when(F.col(auxiliary_col_name) == 0, F.col(index_2_col_name))
+                .otherwise(F.col(index_1_col_name))
+                .alias(index_2_col_name),
+                F.col(cov_output_col_name),
+            ],
+        )
+
+        array_col_name = verify_temp_column_name(sdf, 
"__groupby_cov_array_temp_column__")
+        sdf = sdf.groupby(groupkey_names + [index_1_col_name]).agg(
+            F.array_sort(
+                F.collect_list(
+                    F.struct(
+                        F.col(index_2_col_name),
+                        F.col(cov_output_col_name),
+                    )
+                )
+            ).alias(array_col_name)
+        )
+
+        for i in range(0, num_scols):
+            sdf = sdf.withColumn(auxiliary_col_name, 
F.get(F.col(array_col_name), i)).withColumn(
+                numeric_col_names[i],
+                F.col(f"{auxiliary_col_name}.{cov_output_col_name}"),
+            )
+
+        sdf = sdf.orderBy(groupkey_names + [index_1_col_name])
+
+        sdf = sdf.select(
+            *[F.col(col) for col in groupkey_names + numeric_col_names],
+            *[
+                F.get(F.lit(numeric_col_names), 
F.col(index_1_col_name)).alias(auxiliary_col_name),
+                
F.monotonically_increasing_id().alias(NATURAL_ORDER_COLUMN_NAME),
+            ],
+        )
+
+        return DataFrame(
+            InternalFrame(
+                spark_frame=sdf,
+                index_spark_columns=[
+                    scol_for(sdf, key) for key in groupkey_names + 
[auxiliary_col_name]
+                ],
+                index_names=(
+                    [psser._column_label for psser in self._groupkeys]
+                    + self._psdf._internal.index_names
+                ),
+                column_labels=numeric_labels,
+                column_label_names=internal.column_label_names,
+            )
+        )
+
 
 class SeriesGroupBy(GroupBy[Series]):
     @staticmethod
diff --git a/python/pyspark/pandas/missing/groupby.py 
b/python/pyspark/pandas/missing/groupby.py
index 04891006dee7..9e655384dfac 100644
--- a/python/pyspark/pandas/missing/groupby.py
+++ b/python/pyspark/pandas/missing/groupby.py
@@ -42,7 +42,6 @@ class MissingPandasLikeDataFrameGroupBy:
 
     # Properties
     corrwith = _unsupported_property("corrwith")
-    cov = _unsupported_property("cov")
     dtypes = _unsupported_property("dtypes")
     groups = _unsupported_property("groups")
     hist = _unsupported_property("hist")
diff --git a/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py 
b/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py
new file mode 100644
index 000000000000..389a50ff6e96
--- /dev/null
+++ b/python/pyspark/pandas/tests/connect/groupby/test_parity_cov.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.pandas.tests.groupby.test_cov import CovMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+
+
+class CovParityTests(
+    CovMixin,
+    PandasOnSparkTestUtils,
+    ReusedConnectTestCase,
+):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.testing import main
+
+    main()
diff --git a/python/pyspark/pandas/tests/groupby/test_cov.py 
b/python/pyspark/pandas/tests/groupby/test_cov.py
new file mode 100644
index 000000000000..849e469b0500
--- /dev/null
+++ b/python/pyspark/pandas/tests/groupby/test_cov.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.loose_version import LooseVersion
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+from pyspark.testing.sqlutils import SQLTestUtils
+
+
+class CovMixin:
+    @property
+    def pdf(self):
+        return pd.DataFrame(
+            {
+                "A": [1, 1, 2, 2, 2, 3],
+                "B": [-1, 2, 3, 5, 6, 0],
+                "C": [4, 6, 5, 1, 3, 0],
+            },
+            columns=["A", "B", "C"],
+        )
+
+    @property
+    def psdf(self):
+        return ps.from_pandas(self.pdf)
+
+    def test_cov(self):
+        for c in ["A", "B", "C"]:
+            self.assert_eq(
+                self.pdf.groupby(c).cov().sort_index(),
+                self.psdf.groupby(c).cov().sort_index(),
+                almost=True,
+            )
+
+    def test_ddof(self):
+        # Use a dataset with enough rows per group to keep N - ddof > 0,
+        # since pandas and Spark diverge on inf/NaN handling when the divisor 
is non-positive.
+        pdf = pd.DataFrame(
+            {
+                "A": [1, 1, 1, 1, 2, 2, 2, 2],
+                "B": [1, 2, 3, 4, 5, 6, 7, 8],
+                "C": [4, 6, 5, 1, 3, 0, 9, 2],
+            },
+            columns=["A", "B", "C"],
+        )
+        psdf = ps.from_pandas(pdf)
+        for ddof in [0, 1, 2]:
+            self.assert_eq(
+                pdf.groupby("A").cov(ddof=ddof).sort_index(),
+                psdf.groupby("A").cov(ddof=ddof).sort_index(),
+                almost=True,
+            )
+
+    def test_min_periods(self):
+        pdf = pd.DataFrame(
+            {
+                "A": [1, 1, 1, 1, 2, 2],
+                "B": [1.0, 2.0, np.nan, 4.0, 5.0, 6.0],
+                "C": [4.0, 6.0, 5.0, 7.0, 1.0, 3.0],
+            },
+            columns=["A", "B", "C"],
+        )
+        psdf = ps.from_pandas(pdf)
+        for m in [1, 2, 3, 4]:
+            self.assert_eq(
+                pdf.groupby("A").cov(min_periods=m).sort_index(),
+                psdf.groupby("A").cov(min_periods=m).sort_index(),
+                almost=True,
+            )
+
+    def test_numeric_only(self):
+        pdf = pd.DataFrame(
+            {
+                "A": [1, 1, 2, 2, 2],
+                "B": [1, 2, 3, 4, 5],
+                "C": [4, 6, 5, 1, 3],
+                "D": ["x", "y", "z", "w", "v"],
+            },
+            columns=["A", "B", "C", "D"],
+        )
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(
+            pdf.groupby("A").cov(numeric_only=True).sort_index(),
+            psdf.groupby("A").cov(numeric_only=True).sort_index(),
+            almost=True,
+        )
+
+    def test_invalid_args(self):
+        with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+            self.psdf.groupby("A").cov(ddof="1")
+
+        if LooseVersion(pd.__version__) >= "3.0.0":
+            with self.assertRaisesRegex(ValueError, "numeric_only accepts only 
Boolean values"):
+                self.psdf.groupby("A").cov(numeric_only="True")
+
+
+class CovTests(
+    CovMixin,
+    PandasOnSparkTestCase,
+    SQLTestUtils,
+):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.testing import main
+
+    main()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to