This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 45f0d90332bd [SPARK-54151][GEO][PYTHON] Introduce the framework for
adding ST functions in PySpark
45f0d90332bd is described below
commit 45f0d90332bdabe9b65544c187b1a91e0381006a
Author: Uros Bojanic <[email protected]>
AuthorDate: Mon Nov 3 12:04:01 2025 -0800
[SPARK-54151][GEO][PYTHON] Introduce the framework for adding ST functions
in PySpark
### What changes were proposed in this pull request?
This PR adds rudimentary WKB read/write ST geospatial functions in PySpark
API, and registers the new Python functions in both PySpark and PySpark Connect.
Note that a similar framework was already implemented in Catalyst, as part
of: https://github.com/apache/spark/pull/52784.
### Why are the changes needed?
Establish a minimal ST function framework in PySpark API, setting the
foundations for expanding geospatial function support in the near future.
### Does this PR introduce _any_ user-facing change?
Yes, this PR introduces 3 new Python functions: `st_asbinary`,
`st_geogfromwkb`, `st_geomfromwkb`.
### How was this patch tested?
Added new PySpark unit test suites:
- `test_functions`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52849 from uros-db/geo-python-functions.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 24110b6f6b1b05176bdb5bde80c8f2144b87c9cc)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../source/reference/pyspark.sql/functions.rst | 10 +++
python/pyspark/sql/connect/functions/builtin.py | 24 +++++++
python/pyspark/sql/functions/__init__.py | 5 ++
python/pyspark/sql/functions/builtin.py | 73 ++++++++++++++++++++++
python/pyspark/sql/tests/test_functions.py | 17 +++++
5 files changed, 129 insertions(+)
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst
b/python/docs/source/reference/pyspark.sql/functions.rst
index e4175707aecd..6576c7245e31 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -652,6 +652,16 @@ Misc Functions
version
+Geospatial ST Functions
+-----------------------
+.. autosummary::
+ :toctree: api/
+
+ st_asbinary
+ st_geogfromwkb
+ st_geomfromwkb
+
+
UDF, UDTF and UDT
-----------------
.. autosummary::
diff --git a/python/pyspark/sql/connect/functions/builtin.py
b/python/pyspark/sql/connect/functions/builtin.py
index 1198596fbb5d..2c58ed946a82 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -4783,6 +4783,30 @@ def bitmap_and_agg(col: "ColumnOrName") -> Column:
bitmap_and_agg.__doc__ = pysparkfuncs.bitmap_and_agg.__doc__
+# Geospatial ST Functions
+
+
+def st_asbinary(geo: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("st_asbinary", geo)
+
+
+st_asbinary.__doc__ = pysparkfuncs.st_asbinary.__doc__
+
+
+def st_geogfromwkb(wkb: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("st_geogfromwkb", wkb)
+
+
+st_geogfromwkb.__doc__ = pysparkfuncs.st_geogfromwkb.__doc__
+
+
+def st_geomfromwkb(wkb: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("st_geomfromwkb", wkb)
+
+
+st_geomfromwkb.__doc__ = pysparkfuncs.st_geomfromwkb.__doc__
+
+
# Call Functions
diff --git a/python/pyspark/sql/functions/__init__.py
b/python/pyspark/sql/functions/__init__.py
index e1b320c98f7f..df9594f18c96 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -522,6 +522,11 @@ __all__ = [ # noqa: F405
"UserDefinedFunction",
"UserDefinedTableFunction",
"arrow_udf",
+ # Geospatial ST Functions
+ "st_asbinary",
+ "st_geogfromwkb",
+ "st_geomfromwkb",
+ # Call Functions
"call_udf",
"pandas_udf",
"udf",
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index 1ac3ac23e888..ade6723485e2 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -25901,6 +25901,79 @@ def bucket(numBuckets: Union[Column, int], col:
"ColumnOrName") -> Column:
return partitioning.bucket(numBuckets, col)
+# Geospatial ST Functions
+
+
+@_try_remote_functions
+def st_asbinary(geo: "ColumnOrName") -> Column:
+ """Returns the input GEOGRAPHY or GEOMETRY value in WKB format.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ geo : :class:`~pyspark.sql.Column` or str
+ A geospatial value, either a GEOGRAPHY or a GEOMETRY.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df =
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
['wkb']) # noqa
+ >>>
df.select(sf.hex(sf.st_asbinary(sf.st_geogfromwkb('wkb'))).alias('result')).collect()
+ [Row(result='0101000000000000000000F03F0000000000000040')]
+ >>> from pyspark.sql import functions as sf
+ >>> df =
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
['wkb']) # noqa
+ >>>
df.select(sf.hex(sf.st_asbinary(sf.st_geomfromwkb('wkb'))).alias('result')).collect()
+ [Row(result='0101000000000000000000F03F0000000000000040')]
+ """
+ return _invoke_function_over_columns("st_asbinary", geo)
+
+
+@_try_remote_functions
+def st_geogfromwkb(wkb: "ColumnOrName") -> Column:
+ """Parses the input WKB description and returns the corresponding
GEOGRAPHY value.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ wkb : :class:`~pyspark.sql.Column` or str
+ A BINARY value in WKB format, representing a GEOGRAPHY value.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df =
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
['wkb']) # noqa
+ >>>
df.select(sf.hex(sf.st_asbinary(sf.st_geogfromwkb('wkb'))).alias('result')).collect()
+ [Row(result='0101000000000000000000F03F0000000000000040')]
+ """
+ return _invoke_function_over_columns("st_geogfromwkb", wkb)
+
+
+@_try_remote_functions
+def st_geomfromwkb(wkb: "ColumnOrName") -> Column:
+ """Parses the input WKB description and returns the corresponding GEOMETRY
value.
+
+ .. versionadded:: 4.1.0
+
+ Parameters
+ ----------
+ wkb : :class:`~pyspark.sql.Column` or str
+ A BINARY value in WKB format, representing a GEOMETRY value.
+
+ Examples
+ --------
+ >>> from pyspark.sql import functions as sf
+ >>> df =
spark.createDataFrame([(bytes.fromhex('0101000000000000000000F03F0000000000000040'),)],
['wkb']) # noqa
+ >>>
df.select(sf.hex(sf.st_asbinary(sf.st_geomfromwkb('wkb'))).alias('result')).collect()
+ [Row(result='0101000000000000000000F03F0000000000000040')]
+ """
+ return _invoke_function_over_columns("st_geomfromwkb", wkb)
+
+
+# Call Functions
+
+
@_try_remote_functions
def call_udf(udfName: str, *cols: "ColumnOrName") -> Column:
"""
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 18f824c463c9..6dc0770d3df4 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -2805,6 +2805,23 @@ class FunctionsTestsMixin:
result_try_validate_utf8 =
df.select(F.try_validate_utf8(df.a).alias("r"))
assertDataFrameEqual([Row(r="abc")], result_try_validate_utf8)
+ # Geospatial ST Functions
+
+ def test_st_asbinary(self):
+ df = self.spark.createDataFrame(
+ [(bytes.fromhex("0101000000000000000000F03F0000000000000040"),)],
+ ["wkb"],
+ )
+ results = df.select(
+ F.hex(F.st_asbinary(F.st_geogfromwkb("wkb"))),
+ F.hex(F.st_asbinary(F.st_geomfromwkb("wkb"))),
+ ).collect()
+ expected = Row(
+ "0101000000000000000000F03F0000000000000040",
+ "0101000000000000000000F03F0000000000000040",
+ )
+ self.assertEqual(results, [expected])
+
class FunctionsTests(ReusedSQLTestCase, FunctionsTestsMixin):
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]