This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 45f0d90332bd [SPARK-54151][GEO][PYTHON] Introduce the framework for 
adding ST functions in PySpark
45f0d90332bd is described below

commit 45f0d90332bdabe9b65544c187b1a91e0381006a
Author: Uros Bojanic <[email protected]>
AuthorDate: Mon Nov 3 12:04:01 2025 -0800

    [SPARK-54151][GEO][PYTHON] Introduce the framework for adding ST functions 
in PySpark
    
    ### What changes were proposed in this pull request?
    This PR adds rudimentary WKB read/write ST geospatial functions in PySpark 
API, and registers the new Python functions in both PySpark and PySpark Connect.
    
    Note that a similar framework was already implemented in Catalyst, as part 
of: https://github.com/apache/spark/pull/52784.
    
    ### Why are the changes needed?
    Establish a minimal ST function framework in PySpark API, setting the 
foundations for expanding geospatial function support in the near future.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, this PR introduces 3 new Python functions: `st_asbinary`, 
`st_geogfromwkb`, `st_geomfromwkb`.
    
    ### How was this patch tested?
    Added new PySpark unit test suites:
    - `test_functions`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52849 from uros-db/geo-python-functions.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 24110b6f6b1b05176bdb5bde80c8f2144b87c9cc)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../source/reference/pyspark.sql/functions.rst     | 10 +++
 python/pyspark/sql/connect/functions/builtin.py    | 24 +++++++
 python/pyspark/sql/functions/__init__.py           |  5 ++
 python/pyspark/sql/functions/builtin.py            | 73 ++++++++++++++++++++++
 python/pyspark/sql/tests/test_functions.py         | 17 +++++
 5 files changed, 129 insertions(+)

diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index e4175707aecd..6576c7245e31 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -652,6 +652,16 @@ Misc Functions
     version
 
 
+Geospatial ST Functions
+-----------------------
+.. autosummary::
+    :toctree: api/
+
+    st_asbinary
+    st_geogfromwkb
+    st_geomfromwkb
+
+
 UDF, UDTF and UDT
 -----------------
 .. autosummary::
diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index 1198596fbb5d..2c58ed946a82 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -4783,6 +4783,30 @@ def bitmap_and_agg(col: "ColumnOrName") -> Column:
 bitmap_and_agg.__doc__ = pysparkfuncs.bitmap_and_agg.__doc__
 
 
+# Geospatial ST Functions
+
+
+def st_asbinary(geo: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("st_asbinary", geo)
+
+
+st_asbinary.__doc__ = pysparkfuncs.st_asbinary.__doc__
+
+
+def st_geogfromwkb(wkb: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("st_geogfromwkb", wkb)
+
+
+st_geogfromwkb.__doc__ = pysparkfuncs.st_geogfromwkb.__doc__
+
+
+def st_geomfromwkb(wkb: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("st_geomfromwkb", wkb)
+
+
+st_geomfromwkb.__doc__ = pysparkfuncs.st_geomfromwkb.__doc__
+
+
 # Call Functions
 
 
diff --git a/python/pyspark/sql/functions/__init__.py 
b/python/pyspark/sql/functions/__init__.py
index e1b320c98f7f..df9594f18c96 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -522,6 +522,11 @@ __all__ = [  # noqa: F405
     "UserDefinedFunction",
     "UserDefinedTableFunction",
     "arrow_udf",
+    # Geospatial ST Functions
+    "st_asbinary",
+    "st_geogfromwkb",
+    "st_geomfromwkb",
+    # Call Functions
     "call_udf",
     "pandas_udf",
     "udf",
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 1ac3ac23e888..ade6723485e2 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -25901,6 +25901,79 @@ def bucket(numBuckets: Union[Column, int], col: 
"ColumnOrName") -> Column:
     return partitioning.bucket(numBuckets, col)
 
 
+# Geospatial ST Functions
+
+
+@_try_remote_functions
+def st_asbinary(geo: "ColumnOrName") -> Column:
+    """Returns the input GEOGRAPHY or GEOMETRY value in WKB format.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    geo : :class:`~pyspark.sql.Column` or str
+        A geospatial value, either a GEOGRAPHY or a GEOMETRY.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df = 
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
 ['wkb'])  # noqa
+    >>> 
df.select(sf.hex(sf.st_asbinary(sf.st_geogfromwkb('wkb'))).alias('result')).collect()
+    [Row(result='0101000000000000000000F03F0000000000000040')]
+    >>> from pyspark.sql import functions as sf
+    >>> df = 
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
 ['wkb'])  # noqa
+    >>> 
df.select(sf.hex(sf.st_asbinary(sf.st_geomfromwkb('wkb'))).alias('result')).collect()
+    [Row(result='0101000000000000000000F03F0000000000000040')]
+    """
+    return _invoke_function_over_columns("st_asbinary", geo)
+
+
+@_try_remote_functions
+def st_geogfromwkb(wkb: "ColumnOrName") -> Column:
+    """Parses the input WKB description and returns the corresponding 
GEOGRAPHY value.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    wkb : :class:`~pyspark.sql.Column` or str
+        A BINARY value in WKB format, representing a GEOGRAPHY value.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df = 
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
 ['wkb'])  # noqa
+    >>> 
df.select(sf.hex(sf.st_asbinary(sf.st_geogfromwkb('wkb'))).alias('result')).collect()
+    [Row(result='0101000000000000000000F03F0000000000000040')]
+    """
+    return _invoke_function_over_columns("st_geogfromwkb", wkb)
+
+
+@_try_remote_functions
+def st_geomfromwkb(wkb: "ColumnOrName") -> Column:
+    """Parses the input WKB description and returns the corresponding GEOMETRY 
value.
+
+    .. versionadded:: 4.1.0
+
+    Parameters
+    ----------
+    wkb : :class:`~pyspark.sql.Column` or str
+        A BINARY value in WKB format, representing a GEOMETRY value.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> df = 
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
 ['wkb'])  # noqa
+    >>> 
df.select(sf.hex(sf.st_asbinary(sf.st_geomfromwkb('wkb'))).alias('result')).collect()
+    [Row(result='0101000000000000000000F03F0000000000000040')]
+    """
+    return _invoke_function_over_columns("st_geomfromwkb", wkb)
+
+
+# Call Functions
+
+
 @_try_remote_functions
 def call_udf(udfName: str, *cols: "ColumnOrName") -> Column:
     """
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 18f824c463c9..6dc0770d3df4 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -2805,6 +2805,23 @@ class FunctionsTestsMixin:
         result_try_validate_utf8 = 
df.select(F.try_validate_utf8(df.a).alias("r"))
         assertDataFrameEqual([Row(r="abc")], result_try_validate_utf8)
 
+    # Geospatial ST Functions
+
+    def test_st_asbinary(self):
+        df = self.spark.createDataFrame(
+            [(bytes.fromhex("0101000000000000000000F03F0000000000000040"),)],
+            ["wkb"],
+        )
+        results = df.select(
+            F.hex(F.st_asbinary(F.st_geogfromwkb("wkb"))),
+            F.hex(F.st_asbinary(F.st_geomfromwkb("wkb"))),
+        ).collect()
+        expected = Row(
+            "0101000000000000000000F03F0000000000000040",
+            "0101000000000000000000F03F0000000000000040",
+        )
+        self.assertEqual(results, [expected])
+
 
 class FunctionsTests(ReusedSQLTestCase, FunctionsTestsMixin):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to