This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 2f9a7678eef7 [SPARK-57088][SQL] Allow non-deterministic ranking 
expression for EXACT NEAREST BY
2f9a7678eef7 is described below

commit 2f9a7678eef7e58983eb956a1fe2b6c88ce85aa9
Author: Zero Qu <[email protected]>
AuthorDate: Wed May 27 13:47:03 2026 +0800

    [SPARK-57088][SQL] Allow non-deterministic ranking expression for EXACT 
NEAREST BY
    
    ### What changes were proposed in this pull request?
    
    Removes the `NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` 
rejection in `CheckAnalysis` so the `EXACT` mode of `NEAREST BY JOIN` (added in 
SPARK-56395) accepts non-deterministic ranking expressions, the same way 
`APPROX` already does.
    
    Concretely:
    - Drop the `NearestByJoin` arm in `CheckAnalysis` that failed analysis when 
`approx = false` and the ranking expression was non-deterministic.
    - Change `NearestByJoin.allowNonDeterministicExpression` to return `true` 
unconditionally (was previously returning `approx`).
    - Delete the `NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` error 
condition.
    - Update scaladoc/comments in `NearestByJoin` and `RewriteNearestByJoin` to 
reflect that both modes permit a non-deterministic ranking expression.
    - Update the user-facing docs in `sql-ref-syntax-qry-select-join.md`.
    - Convert the existing rejection tests (Scala, Python, SQL golden) to 
positive tests asserting that EXACT + a non-deterministic ranking expression 
now succeeds.
    
    ### Why are the changes needed?
    
    `APPROX` vs. `EXACT` and determinism are orthogonal concerns:
    - `APPROX` vs. `EXACT` is about the search algorithm contract: `APPROX` 
permits the optimizer to use faster approximate strategies (e.g. indexed ANN); 
`EXACT` forces brute-force evaluation.
    - Determinism is a property of the ranking expression itself. Ordinary 
joins, for example, accept non-deterministic join conditions without forcing 
the user into an "approximate" join.
    
    `EXACT` describes algebraic semantics ("compute the exact top-K according 
to the user's ranking expression"); it does not promise reproducibility across 
runs when the ranking expression is itself non-deterministic. Coupling the two 
was an over-restriction that this PR removes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Queries of the form
    
    ```sql
    SELECT ... FROM left JOIN right EXACT NEAREST k BY {DISTANCE | SIMILARITY} 
<non-deterministic expression>
    ```
    previously failed at analysis with 
`NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION`; they are now accepted 
and evaluated through the same brute-force rewrite as the `APPROX` variant.
    
    The error condition 
`NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` is removed.
    
    ### How was this patch tested?
    
    - Updated `DataFrameNearestByJoinSuite`: the rejection test is converted to 
a positive test asserting the result count (21/21 passing locally).
    - Updated the PySpark equivalent in `test_nearest_by_join.py`.
    - Updated the SQL golden file `join-nearest-by.sql` (replaced the 
failing-EXACT query with a `COUNT(*)` query mirroring the existing APPROX 
case); regenerated `results/` and `analyzer-results/`. `SQLQueryTestSuite -z 
join-nearest-by` passes (2/2).
    - `RewriteNearestByJoinSuite` (12/12) still passes — the 
materializing-Project path in the optimizer rewrite already handled 
non-deterministic ranking expressions; only the analyzer gate changes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
    
    Closes #56128 from zhidongqu-db/allow-exact-no-deter-expr.
    
    Lead-authored-by: Zero Qu <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 2603f6a639394bc604bc8c77f4eb071df6070750)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  5 ----
 docs/sql-ref-syntax-qry-select-join.md             |  4 ++--
 python/pyspark/sql/tests/test_nearest_by_join.py   | 27 ++++++++--------------
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  6 -----
 .../catalyst/optimizer/RewriteNearestByJoin.scala  | 10 ++++----
 .../sql/catalyst/plans/logical/NearestByJoin.scala | 16 ++++---------
 .../analyzer-results/join-nearest-by.sql.out       | 25 ++++++--------------
 .../resources/sql-tests/inputs/join-nearest-by.sql | 14 ++++++-----
 .../sql-tests/results/join-nearest-by.sql.out      | 27 +++++++---------------
 .../spark/sql/DataFrameNearestByJoinSuite.scala    | 25 +++++++++-----------
 10 files changed, 56 insertions(+), 103 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 9c9a657bc6e9..b792afc47b57 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5459,11 +5459,6 @@
           "Nearest-by join is implemented as a bounded cross-product 
internally and is therefore rejected when `spark.sql.crossJoin.enabled = 
false`. Set `spark.sql.crossJoin.enabled = true` to permit it, or rewrite the 
query without nearest-by."
         ]
       },
-      "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
-        "message" : [
-          "EXACT nearest-by join is incompatible with the nondeterministic 
ranking expression <expression>. Use APPROX, or replace the expression with a 
deterministic one."
-        ]
-      },
       "NON_ORDERABLE_RANKING_EXPRESSION" : {
         "message" : [
           "The ranking expression <expression> of type <type> is not 
orderable. Provide an expression that returns an orderable type, such as a 
numeric distance like abs(a.col - b.col) or a numeric similarity score."
diff --git a/docs/sql-ref-syntax-qry-select-join.md 
b/docs/sql-ref-syntax-qry-select-join.md
index a082a13707bd..646297831d1c 100644
--- a/docs/sql-ref-syntax-qry-select-join.md
+++ b/docs/sql-ref-syntax-qry-select-join.md
@@ -61,7 +61,7 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [ 
join_criteria | nearest_by_
 
     `APPROX | EXACT`
 
-    Controls the search algorithm contract. `APPROX` allows the optimizer to 
use faster approximate strategies (such as indexed nearest-neighbor search when 
available). `EXACT` forces brute-force evaluation and requires 
`ranking_expression` to be deterministic.
+    Controls the search algorithm contract. `APPROX` allows the optimizer to 
use faster approximate strategies (such as indexed nearest-neighbor search when 
available). `EXACT` forces brute-force evaluation.
 
     `num_results`
 
@@ -73,7 +73,7 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [ 
join_criteria | nearest_by_
 
     `ranking_expression`
 
-    A scalar expression that returns an orderable type. Must be deterministic 
with `EXACT`; may be nondeterministic with `APPROX` (e.g., `rand()` for 
randomized tie-breaking). The expression is evaluated once per (left, right) 
pair on the brute-force path, so avoid expensive or side-effecting UDFs in 
ranking expressions.
+    A scalar expression that returns an orderable type. The expression is 
evaluated once per (left, right) pair on the brute-force path, so avoid 
expensive or side-effecting UDFs in ranking expressions.
 
     **Performance note.** The current implementation evaluates the full 
cross-product of the left and right sides and bounds memory per left row by 
`num_results`. Per-query work is `O(|left| × |right| × log num_results)`. 
Index-backed approximate strategies (transparent to `APPROX` queries) are 
planned in a future release; until then, pre-filter the right side (e.g. via a 
subquery) when it is large.
 
diff --git a/python/pyspark/sql/tests/test_nearest_by_join.py 
b/python/pyspark/sql/tests/test_nearest_by_join.py
index fdee3043289e..5e5236c213fb 100644
--- a/python/pyspark/sql/tests/test_nearest_by_join.py
+++ b/python/pyspark/sql/tests/test_nearest_by_join.py
@@ -217,24 +217,17 @@ class NearestByJoinTestsMixin:
                 messageParameters={},
             )
 
-    def test_exact_with_nondeterministic_ranking_rejected(self):
+    def test_exact_with_nondeterministic_ranking_accepted(self):
         users, products = self.users, self.products
-        # Use an explicit seed (`rand(0)`) so the rendered expression in the 
error message is
-        # byte-stable. Without it, Spark assigns a random seed at analysis and 
the message
-        # parameter becomes `"(rand(<random-long>) + pscore)"`, which can't be 
asserted on.
-        with self.assertRaises(AnalysisException) as pe:
-            users.nearestByJoin(
-                products,
-                sf.rand(0) + products.pscore,
-                numResults=1,
-                mode="exact",
-                direction="similarity",
-            ).collect()
-        self.check_error(
-            exception=pe.exception,
-            
errorClass="NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
-            messageParameters={"expression": '"(rand(0) + pscore)"'},
-        )
+        # Result rows are nondeterministic; only assert that each left row 
gets exactly one match.
+        count = users.nearestByJoin(
+            products,
+            sf.rand(0) + products.pscore,
+            numResults=1,
+            mode="exact",
+            direction="similarity",
+        ).count()
+        self.assertEqual(count, 3)
 
     def test_streaming_inputs_rejected(self):
         streaming_users = (
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9c4fbd719a96..4e07280f94c9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -701,12 +701,6 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
                 "expression" -> toSQLExpr(rankingExpression),
                 "type" -> toSQLType(rankingExpression.dataType)))
 
-          case j @ NearestByJoin(_, _, _, false, _, rankingExpression, _)
-              if !rankingExpression.deterministic =>
-            j.failAnalysis(
-              errorClass = 
"NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
-              messageParameters = Map("expression" -> 
toSQLExpr(rankingExpression)))
-
           case a: Aggregate =>
             a.groupingExpressions.foreach(
               expression =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
index e920bbfffc55..ee8e61b457fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
@@ -56,11 +56,11 @@ import org.apache.spark.sql.catalyst.rules._
  * preserves array order, so the K rows emitted per left row appear best-first 
in the output
  * of this rule. (Downstream operators may reorder.)
  *
- * If `rankingExpression` is nondeterministic (legal only under `APPROX`), an 
extra
- * `Project` is inserted above the `Join` to materialize the value as 
`__ranking__`. The
- * standard projection machinery runs 
`Nondeterministic.initialize(partitionIndex)` on every
- * nondeterministic descendant before any value is evaluated, so `MaxMinByK` 
only ever sees a
- * plain `AttributeReference` and never evaluates a nondeterministic 
expression directly.
+ * If `rankingExpression` is nondeterministic, an extra `Project` is inserted 
above the `Join`
+ * to materialize the value as `__ranking__`. The standard projection 
machinery runs
+ * `Nondeterministic.initialize(partitionIndex)` on every nondeterministic 
descendant before any
+ * value is evaluated, so `MaxMinByK` only ever sees a plain 
`AttributeReference` and never
+ * evaluates a nondeterministic expression directly.
  *
  * Unlike [[RewriteAsOfJoin]], which uses a correlated scalar subquery, this 
rule materializes
  * the cross product directly. A scalar subquery returns a single value per 
left row, so it
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
index 6a5c94d4a1df..4aaac7dfe546 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
@@ -41,10 +41,8 @@ object NearestByJoin {
  * @param right The right (base) side of the join, against which each left row 
finds matches.
  * @param joinType  Must be `Inner` or `LeftOuter`. `Inner` drops left rows 
with no matches;
  *                  `LeftOuter` preserves them with `NULL` right-side columns.
- * @param approx  `true` for `APPROX` mode, `false` for `EXACT` mode. `APPROX` 
permits a
- *                nondeterministic `rankingExpression` and is the contract 
future indexed
- *                approximate-nearest-neighbor strategies key off; `EXACT` 
requires
- *                determinism (enforced by `CheckAnalysis`).
+ * @param approx  `true` for `APPROX` mode, `false` for `EXACT` mode. `APPROX` 
is the contract
+ *                future indexed approximate-nearest-neighbor strategies key 
off.
  * @param numResults  The K in top-K: the maximum number of right-side matches 
returned per
  *                    left row. Bounded above by `NearestByJoin.MaxNumResults`.
  * @param rankingExpression  Scalar expression evaluated per (left, right) 
pair. Must return
@@ -66,13 +64,9 @@ case class NearestByJoin(
   require(Seq(Inner, LeftOuter).contains(joinType),
     s"Unsupported nearest-by join type $joinType")
 
-  // `APPROX` mode permits a nondeterministic ranking expression (e.g. 
`rand()` for randomized
-  // tie-breaking). `EXACT` mode requires determinism, and that requirement is 
enforced
-  // separately by the 
`NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` arm in
-  // `CheckAnalysis`. Returning `approx` from this override is what lets 
APPROX queries pass
-  // the generic `INVALID_NON_DETERMINISTIC_EXPRESSIONS` check that fires on 
operators not on
-  // the analyzer's whitelist.
-  override def allowNonDeterministicExpression: Boolean = approx
+  // Both APPROX and EXACT permit a nondeterministic ranking expression (e.g. 
`rand()` for
+  // randomized tie-breaking, or an external scoring UDF).
+  override def allowNonDeterministicExpression: Boolean = true
 
   // Both left- and right-side attributes are declared nullable to match the 
schema produced
   // by `RewriteNearestByJoin`. Right-side attributes are widened because the 
rewrite
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
index 48819f172310..3b7d9e55ca87 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
@@ -286,25 +286,14 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 
 
 -- !query
-SELECT u.user_id, p.product
-FROM users u JOIN products p
-  EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+SELECT COUNT(*) AS num_rows
+FROM (
+  SELECT u.user_id, p.product
+  FROM users u JOIN products p
+    EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+)
 -- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
-  "sqlState" : "42604",
-  "messageParameters" : {
-    "expression" : "\"(rand() + pscore)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 42,
-    "stopIndex" : 106,
-    "fragment" : "JOIN products p\n  EXACT NEAREST 1 BY SIMILARITY rand() + 
p.pscore"
-  } ]
-}
+[Analyzer test output redacted due to nondeterminism]
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
index 6b3dc63d28e3..40cfa87c4cde 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
@@ -74,13 +74,15 @@ SELECT u.user_id, p.product
 FROM users u JOIN products p
   APPROX NEAREST 1 BY SIMILARITY map(u.score, p.pscore);
 
--- Error: EXACT mode with nondeterministic ranking expression
-SELECT u.user_id, p.product
-FROM users u JOIN products p
-  EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore;
-
--- APPROX permits a nondeterministic ranking expression (per the SPIP). Rows 
differ run to
+-- Both EXACT and APPROX permit a nondeterministic ranking expression. Rows 
differ run to
 -- run, so we only assert the row count: one match per left row when k = 1.
+SELECT COUNT(*) AS num_rows
+FROM (
+  SELECT u.user_id, p.product
+  FROM users u JOIN products p
+    EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+);
+
 SELECT COUNT(*) AS num_rows
 FROM (
   SELECT u.user_id, p.product
diff --git 
a/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
index d06fb53686e7..81803d139672 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
@@ -228,27 +228,16 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 
 
 -- !query
-SELECT u.user_id, p.product
-FROM users u JOIN products p
-  EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+SELECT COUNT(*) AS num_rows
+FROM (
+  SELECT u.user_id, p.product
+  FROM users u JOIN products p
+    EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+)
 -- !query schema
-struct<>
+struct<num_rows:bigint>
 -- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
-  "sqlState" : "42604",
-  "messageParameters" : {
-    "expression" : "\"(rand() + pscore)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 42,
-    "stopIndex" : 106,
-    "fragment" : "JOIN products p\n  EXACT NEAREST 1 BY SIMILARITY rand() + 
p.pscore"
-  } ]
-}
+3
 
 
 -- !query
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
index b34880b71f5b..271c5eb4552c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
@@ -425,20 +425,17 @@ class DataFrameNearestByJoinSuite extends QueryTest with 
SharedSparkSession {
         "type" -> "\"MAP<DOUBLE, DOUBLE>\""))
   }
 
-  test("EXACT mode rejects nondeterministic ranking expression") {
+  test("EXACT mode accepts nondeterministic ranking expression") {
     val (users, products) = prepareForNearestByJoin()
-    checkError(
-      exception = intercept[AnalysisException] {
-        users.nearestByJoin(
-          products,
-          rand() + products("pscore"),
-          numResults = 1,
-          joinType = "inner",
-          mode = "exact",
-          direction = "similarity")
-      },
-      condition = "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
-      matchPVals = true,
-      parameters = Map("expression" -> ".*rand.*pscore.*"))
+    val result = users.nearestByJoin(
+      products,
+      rand() + products("pscore"),
+      numResults = 1,
+      joinType = "inner",
+      mode = "exact",
+      direction = "similarity")
+
+    // Result rows are nondeterministic; only assert that each left row gets 
exactly one match.
+    assert(result.count() === 3)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to