This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b66d392689ff [SPARK-57284][PYTHON][SQL] Add Scala/Python bindings for 
vector functions
b66d392689ff is described below

commit b66d392689ffa66cdea7df94a145c3f7068e4a60
Author: Kousuke Saruta <[email protected]>
AuthorDate: Sun Jun 7 01:38:33 2026 +0900

    [SPARK-57284][PYTHON][SQL] Add Scala/Python bindings for vector functions
    
    ### What changes were proposed in this pull request?
    This PR adds following Scala/Python bindings for vector functions which 
were added in SPARK-55030 (#53924), SPARK-55593 (#54368) and SPARK-55031 
(#54011)
    
    ### Why are the changes needed?
    For better built-in function parity.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, new built-in functions introduced.
    
    ### How was this patch tested?
    New tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Kiro CLI / Claude.
    
    Closes #56322 from sarutak/pyspark-vector-functions.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../source/reference/pyspark.sql/functions.rst     |  14 ++
 python/pyspark/sql/connect/functions/builtin.py    |  58 ++++++
 python/pyspark/sql/functions/__init__.py           |   8 +
 python/pyspark/sql/functions/builtin.py            | 224 +++++++++++++++++++++
 python/pyspark/sql/tests/test_functions.py         |  41 ++++
 .../scala/org/apache/spark/sql/functions.scala     |  72 +++++++
 .../org/apache/spark/sql/MiscFunctionsSuite.scala  |  34 ++++
 7 files changed, 451 insertions(+)

diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index 06f2c47e24bd..4d91d15a87d0 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -725,6 +725,20 @@ Geospatial ST Functions
     st_srid
 
 
+Vector Functions
+----------------
+.. autosummary::
+    :toctree: api/
+
+    vector_cosine_similarity
+    vector_inner_product
+    vector_l2_distance
+    vector_norm
+    vector_normalize
+    vector_avg
+    vector_sum
+
+
 UDF, UDTF and UDT
 -----------------
 .. autosummary::
diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index c1109df1c41a..7c7edef02f0d 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -5570,6 +5570,64 @@ def call_function(funcName: str, *cols: "ColumnOrName") 
-> Column:
 call_function.__doc__ = pysparkfuncs.call_function.__doc__
 
 
+# ---------------------- Vector Functions ----------------------
+
+
+def vector_cosine_similarity(left: "ColumnOrName", right: "ColumnOrName") -> 
Column:
+    return _invoke_function_over_columns("vector_cosine_similarity", left, 
right)
+
+
+vector_cosine_similarity.__doc__ = 
pysparkfuncs.vector_cosine_similarity.__doc__
+
+
+def vector_inner_product(left: "ColumnOrName", right: "ColumnOrName") -> 
Column:
+    return _invoke_function_over_columns("vector_inner_product", left, right)
+
+
+vector_inner_product.__doc__ = pysparkfuncs.vector_inner_product.__doc__
+
+
+def vector_l2_distance(left: "ColumnOrName", right: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("vector_l2_distance", left, right)
+
+
+vector_l2_distance.__doc__ = pysparkfuncs.vector_l2_distance.__doc__
+
+
+def vector_norm(vector: "ColumnOrName", degree: Optional["ColumnOrName"] = 
None) -> Column:
+    if degree is None:
+        return _invoke_function_over_columns("vector_norm", vector)
+    else:
+        return _invoke_function_over_columns("vector_norm", vector, degree)
+
+
+vector_norm.__doc__ = pysparkfuncs.vector_norm.__doc__
+
+
+def vector_normalize(vector: "ColumnOrName", degree: Optional["ColumnOrName"] 
= None) -> Column:
+    if degree is None:
+        return _invoke_function_over_columns("vector_normalize", vector)
+    else:
+        return _invoke_function_over_columns("vector_normalize", vector, 
degree)
+
+
+vector_normalize.__doc__ = pysparkfuncs.vector_normalize.__doc__
+
+
+def vector_avg(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("vector_avg", col)
+
+
+vector_avg.__doc__ = pysparkfuncs.vector_avg.__doc__
+
+
+def vector_sum(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("vector_sum", col)
+
+
+vector_sum.__doc__ = pysparkfuncs.vector_sum.__doc__
+
+
 def _test() -> None:
     import sys
     import os
diff --git a/python/pyspark/sql/functions/__init__.py 
b/python/pyspark/sql/functions/__init__.py
index b90b5a26ecb0..9a9b18a29278 100644
--- a/python/pyspark/sql/functions/__init__.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -584,6 +584,14 @@ __all__ = [  # noqa: F405
     "st_geomfromwkb",
     "st_setsrid",
     "st_srid",
+    # Vector Functions
+    "vector_cosine_similarity",
+    "vector_inner_product",
+    "vector_l2_distance",
+    "vector_norm",
+    "vector_normalize",
+    "vector_avg",
+    "vector_sum",
     # Call Functions
     "call_udf",
     "pandas_udf",
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 841d422f2026..d0dfb2278565 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -30980,6 +30980,230 @@ def arrow_udtf(
         return _create_pyarrow_udtf(cls=cls, returnType=returnType)
 
 
+# ---------------------- Vector Functions ----------------------
+
+
+@_try_remote_functions
+def vector_cosine_similarity(left: "ColumnOrName", right: "ColumnOrName") -> 
Column:
+    """Returns the cosine similarity between two float vectors.
+    The vectors must have the same dimension.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    left : :class:`~pyspark.sql.Column` or column name
+        first vector column.
+    right : :class:`~pyspark.sql.Column` or column name
+        second vector column.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        cosine similarity as a float value.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('a', ArrayType(FloatType())), 
StructField('b', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])], 
schema)
+    >>> df.select(sf.vector_cosine_similarity('a', 'b')).first()[0]
+    0.974631...
+    """
+    return _invoke_function_over_columns("vector_cosine_similarity", left, 
right)
+
+
+@_try_remote_functions
+def vector_inner_product(left: "ColumnOrName", right: "ColumnOrName") -> 
Column:
+    """Returns the inner product (dot product) between two float vectors.
+    The vectors must have the same dimension.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    left : :class:`~pyspark.sql.Column` or column name
+        first vector column.
+    right : :class:`~pyspark.sql.Column` or column name
+        second vector column.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        inner product as a float value.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('a', ArrayType(FloatType())), 
StructField('b', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])], 
schema)
+    >>> df.select(sf.vector_inner_product('a', 'b')).first()[0]
+    32.0
+    """
+    return _invoke_function_over_columns("vector_inner_product", left, right)
+
+
+@_try_remote_functions
+def vector_l2_distance(left: "ColumnOrName", right: "ColumnOrName") -> Column:
+    """Returns the Euclidean (L2) distance between two float vectors.
+    The vectors must have the same dimension.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    left : :class:`~pyspark.sql.Column` or column name
+        first vector column.
+    right : :class:`~pyspark.sql.Column` or column name
+        second vector column.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        L2 distance as a float value.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('a', ArrayType(FloatType())), 
StructField('b', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])], 
schema)
+    >>> df.select(sf.vector_l2_distance('a', 'b')).first()[0]
+    5.196152...
+    """
+    return _invoke_function_over_columns("vector_l2_distance", left, right)
+
+
+@_try_remote_functions
+def vector_norm(vector: "ColumnOrName", degree: Optional["ColumnOrName"] = 
None) -> Column:
+    """Returns the Lp norm of a float vector using the specified degree.
+    Degree defaults to 2.0 (Euclidean norm) if unspecified.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    vector : :class:`~pyspark.sql.Column` or column name
+        input vector column.
+    degree : :class:`~pyspark.sql.Column` or column name, optional
+        norm degree (1.0 for L1, 2.0 for L2, float('inf') for infinity norm).
+        Defaults to 2.0.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the Lp norm as a float value.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([3.0, 4.0],)], schema)
+    >>> df.select(sf.vector_norm('v', sf.lit(2.0).cast('float'))).first()[0]
+    5.0
+    """
+    if degree is None:
+        return _invoke_function_over_columns("vector_norm", vector)
+    else:
+        return _invoke_function_over_columns("vector_norm", vector, degree)
+
+
+@_try_remote_functions
+def vector_normalize(vector: "ColumnOrName", degree: Optional["ColumnOrName"] 
= None) -> Column:
+    """Normalizes a float vector to unit length using the specified norm 
degree.
+    Degree defaults to 2.0 (Euclidean norm) if unspecified.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    vector : :class:`~pyspark.sql.Column` or column name
+        input vector column.
+    degree : :class:`~pyspark.sql.Column` or column name, optional
+        norm degree (1.0 for L1, 2.0 for L2, float('inf') for infinity norm).
+        Defaults to 2.0.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the normalized vector as an array of floats.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([3.0, 4.0],)], schema)
+    >>> df.select(sf.vector_normalize('v', 
sf.lit(2.0).cast('float'))).first()[0]
+    [0.6..., 0.8...]
+    """
+    if degree is None:
+        return _invoke_function_over_columns("vector_normalize", vector)
+    else:
+        return _invoke_function_over_columns("vector_normalize", vector, 
degree)
+
+
+@_try_remote_functions
+def vector_avg(col: "ColumnOrName") -> Column:
+    """Aggregate function: returns the element-wise mean of float vectors in a 
group.
+    All vectors must have the same dimension.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        input vector column.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the element-wise average vector as an array of floats.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([1.0, 2.0],), ([3.0, 4.0],)], schema)
+    >>> df.select(sf.vector_avg('v')).first()[0]
+    [2.0, 3.0]
+    """
+    return _invoke_function_over_columns("vector_avg", col)
+
+
+@_try_remote_functions
+def vector_sum(col: "ColumnOrName") -> Column:
+    """Aggregate function: returns the element-wise sum of float vectors in a 
group.
+    All vectors must have the same dimension.
+
+    .. versionadded:: 4.3.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or column name
+        input vector column.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the element-wise sum vector as an array of floats.
+
+    Examples
+    --------
+    >>> from pyspark.sql import functions as sf
+    >>> from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+    >>> schema = StructType([StructField('v', ArrayType(FloatType()))])
+    >>> df = spark.createDataFrame([([1.0, 2.0],), ([3.0, 4.0],)], schema)
+    >>> df.select(sf.vector_sum('v')).first()[0]
+    [4.0, 6.0]
+    """
+    return _invoke_function_over_columns("vector_sum", col)
+
+
 def _test() -> None:
     import doctest
     from pyspark.sql import SparkSession
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index b26163a667c2..f0484cb98c83 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -973,6 +973,47 @@ class FunctionsTestsMixin:
         actual_with_threshold = df.select(F.levenshtein(df.l, df.r, 
2).alias("b"))
         assertDataFrameEqual([Row(b=-1)], actual_with_threshold)
 
+    def test_vector_functions(self):
+        from pyspark.sql.types import ArrayType, FloatType, StructType, 
StructField
+
+        schema = StructType(
+            [
+                StructField("a", ArrayType(FloatType())),
+                StructField("b", ArrayType(FloatType())),
+            ]
+        )
+        df = self.spark.createDataFrame([([1.0, 2.0, 3.0], [4.0, 5.0, 6.0])], 
schema)
+
+        # Similarity/distance functions
+        self.assertAlmostEqual(
+            df.select(F.vector_cosine_similarity("a", "b")).first()[0], 
0.9746318, places=4
+        )
+        self.assertAlmostEqual(
+            df.select(F.vector_inner_product("a", "b")).first()[0], 32.0, 
places=1
+        )
+        self.assertAlmostEqual(
+            df.select(F.vector_l2_distance("a", "b")).first()[0], 5.196152, 
places=4
+        )
+
+        # Norm/normalize functions
+        schema2 = StructType([StructField("v", ArrayType(FloatType()))])
+        df2 = self.spark.createDataFrame([([3.0, 4.0],)], schema2)
+        self.assertAlmostEqual(
+            df2.select(F.vector_norm("v", 
F.lit(2.0).cast("float"))).first()[0], 5.0, places=1
+        )
+        result = df2.select(F.vector_normalize("v", 
F.lit(2.0).cast("float"))).first()[0]
+        self.assertAlmostEqual(result[0], 0.6, places=4)
+        self.assertAlmostEqual(result[1], 0.8, places=4)
+
+        # Aggregate functions
+        df3 = self.spark.createDataFrame([([1.0, 2.0],), ([3.0, 4.0],)], 
schema2)
+        avg_result = df3.select(F.vector_avg("v")).first()[0]
+        self.assertAlmostEqual(avg_result[0], 2.0, places=4)
+        self.assertAlmostEqual(avg_result[1], 3.0, places=4)
+        sum_result = df3.select(F.vector_sum("v")).first()[0]
+        self.assertAlmostEqual(sum_result[0], 4.0, places=4)
+        self.assertAlmostEqual(sum_result[1], 6.0, places=4)
+
     def test_between_function(self):
         df = self.spark.createDataFrame(
             [Row(a=1, b=2, c=3), Row(a=2, b=1, c=3), Row(a=4, b=1, c=4)]
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 7afeedb4439c..115e2c572753 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -11901,6 +11901,78 @@ object functions {
    */
   def unwrap_udt(column: Column): Column = Column.internalFn("unwrap_udt", 
column)
 
+  // ---------------------- Vector Functions ----------------------
+
+  /**
+   * Returns the cosine similarity between two float vectors.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_cosine_similarity(left: Column, right: Column): Column =
+    Column.fn("vector_cosine_similarity", left, right)
+
+  /**
+   * Returns the inner product (dot product) between two float vectors.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_inner_product(left: Column, right: Column): Column =
+    Column.fn("vector_inner_product", left, right)
+
+  /**
+   * Returns the Euclidean (L2) distance between two float vectors.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_l2_distance(left: Column, right: Column): Column =
+    Column.fn("vector_l2_distance", left, right)
+
+  /**
+   * Returns the Lp norm of a float vector. Degree defaults to 2.0 if 
unspecified.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_norm(vector: Column, degree: Column): Column =
+    Column.fn("vector_norm", vector, degree)
+
+  /**
+   * Returns the Lp norm of a float vector using degree 2.0 (Euclidean norm).
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_norm(vector: Column): Column =
+    Column.fn("vector_norm", vector)
+
+  /**
+   * Normalizes a float vector to unit length. Degree defaults to 2.0 if 
unspecified.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_normalize(vector: Column, degree: Column): Column =
+    Column.fn("vector_normalize", vector, degree)
+
+  /**
+   * Normalizes a float vector to unit length using degree 2.0 (Euclidean 
norm).
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_normalize(vector: Column): Column =
+    Column.fn("vector_normalize", vector)
+
+  /**
+   * Aggregate function: returns the element-wise mean of float vectors in a 
group.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_avg(col: Column): Column = Column.fn("vector_avg", col)
+
+  /**
+   * Aggregate function: returns the element-wise sum of float vectors in a 
group.
+   * @group vector_funcs
+   * @since 4.3.0
+   */
+  def vector_sum(col: Column): Column = Column.fn("vector_sum", col)
+
   // scalastyle:off
   // TODO(SPARK-45970): Use @static annotation so Java can access to those
   //   API in the same way. Once we land this fix, should deprecate
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
index fb5d2650327e..cc4aceaed75e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
@@ -285,6 +285,40 @@ class MiscFunctionsSuite extends SharedSparkSession {
     assert(df.selectExpr("random(1)").collect() != null)
     assert(df.select(random(lit(1))).collect() != null)
   }
+
+  test("vector functions") {
+    import org.apache.spark.sql.types.{ArrayType, FloatType, StructField, 
StructType}
+    val schema = StructType(Seq(
+      StructField("a", ArrayType(FloatType)),
+      StructField("b", ArrayType(FloatType))))
+    val df = spark.createDataFrame(
+      java.util.Arrays.asList(Row(Array(1.0f, 2.0f, 3.0f), Array(4.0f, 5.0f, 
6.0f))), schema)
+
+    // Similarity/distance
+    val cos = df.select(vector_cosine_similarity($"a", 
$"b")).first().getFloat(0)
+    assert(math.abs(cos - 0.9746319f) < 0.0001f)
+    val dot = df.select(vector_inner_product($"a", $"b")).first().getFloat(0)
+    assert(math.abs(dot - 32.0f) < 0.01f)
+    val l2 = df.select(vector_l2_distance($"a", $"b")).first().getFloat(0)
+    assert(math.abs(l2 - 5.196152f) < 0.001f)
+
+    // Norm/normalize
+    val schema2 = StructType(Seq(StructField("v", ArrayType(FloatType))))
+    val df2 = spark.createDataFrame(
+      java.util.Arrays.asList(Row(Array(3.0f, 4.0f))), schema2)
+    val norm = df2.select(vector_norm($"v", lit(2.0f))).first().getFloat(0)
+    assert(math.abs(norm - 5.0f) < 0.01f)
+
+    // Aggregate
+    val df3 = spark.createDataFrame(
+      java.util.Arrays.asList(Row(Array(1.0f, 2.0f)), Row(Array(3.0f, 4.0f))), 
schema2)
+    val avg = df3.select(vector_avg($"v")).first().getSeq[Float](0)
+    assert(math.abs(avg(0) - 2.0f) < 0.01f)
+    assert(math.abs(avg(1) - 3.0f) < 0.01f)
+    val sum = df3.select(vector_sum($"v")).first().getSeq[Float](0)
+    assert(math.abs(sum(0) - 4.0f) < 0.01f)
+    assert(math.abs(sum(1) - 6.0f) < 0.01f)
+  }
 }
 
 object ReflectClass {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to