This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new a3d3efa0 feat(rust/sedona-spatial-join): Add config to disable spatial 
join reordering (#733)
a3d3efa0 is described below

commit a3d3efa0eef37a0ce30c41ee41fef05fba421c2b
Author: Yongting You <[email protected]>
AuthorDate: Sat Mar 28 23:19:39 2026 +0800

    feat(rust/sedona-spatial-join): Add config to disable spatial join 
reordering (#733)
---
 python/sedonadb/tests/test_sjoin.py                | 93 +++++++++++++++++++++-
 rust/sedona-common/src/option.rs                   |  5 ++
 .../src/planner/physical_planner.rs                | 29 +++++--
 .../tests/spatial_join_integration.rs              | 33 +++++++-
 4 files changed, 151 insertions(+), 9 deletions(-)

diff --git a/python/sedonadb/tests/test_sjoin.py 
b/python/sedonadb/tests/test_sjoin.py
index 346fe0eb..607ff93e 100644
--- a/python/sedonadb/tests/test_sjoin.py
+++ b/python/sedonadb/tests/test_sjoin.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import json
+import re
 import warnings
 
 import geopandas as gpd
@@ -23,7 +24,7 @@ import numpy as np
 import pandas as pd
 import pytest
 import sedonadb
-from sedonadb.testing import PostGIS, SedonaDB, random_geometry
+from sedonadb.testing import PostGIS, SedonaDB, random_geometry, 
skip_if_not_exists
 from shapely.geometry import Point
 
 
@@ -68,6 +69,96 @@ def test_spatial_join(join_type, on):
         eng_postgis.assert_query_result(sql, sedonadb_results)
 
 
+def _plan_text(df):
+    query_plan = df.to_pandas()
+    return "\n".join(query_plan.iloc[:, 1].astype(str).tolist())
+
+
+def _spatial_join_side_file_names(plan_text):
+    """Extract the left/right parquet file names used by `SpatialJoinExec`.
+
+    Example input:
+        SpatialJoinExec: join_type=Inner, on=ST_intersects(geo_right@0, 
geo_left@0)
+          ProjectionExec: expr=[geometry@0 as geo_right]
+            DataSourceExec: file_groups={1 group: 
[[.../natural-earth_countries_geo.parquet]]}, projection=[geometry], 
file_type=parquet
+          ProbeShuffleExec: partitioning=RoundRobinBatch(1)
+            ProjectionExec: expr=[geometry@0 as geo_left]
+              DataSourceExec: file_groups={1 group: 
[[.../natural-earth_cities_geo.parquet]]}, projection=[geometry], 
file_type=parquet
+
+    Example output:
+        ["natural-earth_countries_geo", "natural-earth_cities_geo"]
+    """
+    spatial_join_idx = plan_text.find("SpatialJoinExec:")
+    assert spatial_join_idx != -1, plan_text
+
+    file_names = re.findall(
+        r"DataSourceExec:.*?/([^/\]]+)\.parquet", plan_text[spatial_join_idx:]
+    )
+    assert len(file_names) >= 2, plan_text
+    return file_names[:2]
+
+
+def test_spatial_join_reordering_can_be_disabled_e2e(geoarrow_data):
+    path_left = (
+        geoarrow_data / "natural-earth" / "files" / 
"natural-earth_cities_geo.parquet"
+    )
+    path_right = (
+        geoarrow_data
+        / "natural-earth"
+        / "files"
+        / "natural-earth_countries_geo.parquet"
+    )
+    skip_if_not_exists(path_left)
+    skip_if_not_exists(path_right)
+
+    with SedonaDB.create_or_skip() as eng_sedonadb:
+        sql = f"""
+            SELECT t1.name
+            FROM '{path_left}' AS t1
+            JOIN '{path_right}' AS t2
+            ON ST_Intersects(t1.geometry, t2.geometry)
+        """
+
+        # Test 1: regular run swaps the join order
+        plan_text = _plan_text(eng_sedonadb.con.sql(f"EXPLAIN {sql}"))
+        print(f"Plan with reordering enabled:\n{plan_text}")
+        assert _spatial_join_side_file_names(plan_text) == [
+            "natural-earth_countries_geo",
+            "natural-earth_cities_geo",
+        ], plan_text
+
+        result_with_reordering = (
+            eng_sedonadb.execute_and_collect(sql)
+            .to_pandas()
+            .sort_values("name")
+            .reset_index(drop=True)
+        )
+        assert len(result_with_reordering) > 0
+
+        # Test 2: with config disabled, join won't reorder
+        eng_sedonadb.con.sql(
+            "SET sedona.spatial_join.spatial_join_reordering TO false"
+        ).execute()
+
+        plan_text = _plan_text(eng_sedonadb.con.sql(f"EXPLAIN {sql}"))
+        print(f"Plan with reordering disabled:\n{plan_text}")
+        assert _spatial_join_side_file_names(plan_text) == [
+            "natural-earth_cities_geo",
+            "natural-earth_countries_geo",
+        ], plan_text
+
+        result_without_reordering = (
+            eng_sedonadb.execute_and_collect(sql)
+            .to_pandas()
+            .sort_values("name")
+            .reset_index(drop=True)
+        )
+        pd.testing.assert_frame_equal(
+            result_without_reordering,
+            result_with_reordering,
+        )
+
+
 @pytest.mark.parametrize(
     "join_type",
     [
diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs
index 21b228d4..157b160a 100644
--- a/rust/sedona-common/src/option.rs
+++ b/rust/sedona-common/src/option.rs
@@ -82,6 +82,11 @@ config_namespace! {
         /// locality might cause imbalanced partitions when running 
out-of-core spatial join.
         pub repartition_probe_side: bool, default = true
 
+        /// Reorder spatial join inputs to put the smaller input on the build 
side
+        /// when statistics are available. If set to `false`, spatial joins
+        /// preserve the original query order.
+        pub spatial_join_reordering: bool, default = true
+
         /// Maximum number of sample bounding boxes collected from the index 
side for partitioning the
         /// data when running out-of-core spatial join
         pub max_index_side_bbox_samples: usize, default = 10000
diff --git a/rust/sedona-spatial-join/src/planner/physical_planner.rs 
b/rust/sedona-spatial-join/src/planner/physical_planner.rs
index b99fe4e8..0eb7d0d8 100644
--- a/rust/sedona-spatial-join/src/planner/physical_planner.rs
+++ b/rust/sedona-spatial-join/src/planner/physical_planner.rs
@@ -40,7 +40,7 @@ use crate::planner::logical_plan_node::SpatialJoinPlanNode;
 use crate::planner::probe_shuffle_exec::ProbeShuffleExec;
 use crate::planner::spatial_expr_utils::{is_spatial_predicate_supported, 
transform_join_filter};
 use crate::spatial_predicate::SpatialPredicate;
-use sedona_common::option::SedonaOptions;
+use sedona_common::option::{SedonaOptions, SpatialJoinOptions};
 
 /// Registers a query planner that can produce [`SpatialJoinExec`] from a 
logical extension node.
 pub(crate) fn register_spatial_join_planner(builder: SessionStateBuilder) -> 
SessionStateBuilder {
@@ -102,6 +102,7 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
         else {
             return sedona_internal_err!("SedonaOptions not found in session 
state extensions");
         };
+        let spatial_join_options = &ext.spatial_join;
 
         if !ext.spatial_join.enable {
             return sedona_internal_err!("Spatial join is disabled in 
SedonaOptions");
@@ -151,14 +152,18 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
 
         let should_swap = !matches!(spatial_predicate, 
SpatialPredicate::KNearestNeighbors(_))
             && join_type.supports_swap()
-            && should_swap_join_order(physical_left.as_ref(), 
physical_right.as_ref())?;
+            && should_swap_join_order(
+                spatial_join_options,
+                physical_left.as_ref(),
+                physical_right.as_ref(),
+            )?;
 
         // Repartition the probe side when enabled. This breaks spatial 
locality in sorted/skewed
         // datasets, leading to more balanced workloads during out-of-core 
spatial join.
         // We determine which pre-swap input will be the probe AFTER any 
potential swap, and
         // repartition it here. swap_inputs() will then carry the 
RepartitionExec to the correct
         // child position.
-        let (physical_left, physical_right) = if 
ext.spatial_join.repartition_probe_side {
+        let (physical_left, physical_right) = if 
spatial_join_options.repartition_probe_side {
             repartition_probe_side(
                 physical_left,
                 physical_right,
@@ -176,7 +181,7 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
             remainder,
             join_type,
             None,
-            &ext.spatial_join,
+            spatial_join_options,
         )?;
 
         if should_swap {
@@ -192,8 +197,20 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
 ///    produce a smaller and more efficient spatial index (R-tree).
 /// 2. If row-count statistics are unavailable (for example, for CSV sources),
 ///    fall back to total input size as an estimate.
-/// 3. Do not swap the join order if no relevant statistics are available.
-fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) 
-> Result<bool> {
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+///    statistics are available.
+fn should_swap_join_order(
+    spatial_join_options: &SpatialJoinOptions,
+    left: &dyn ExecutionPlan,
+    right: &dyn ExecutionPlan,
+) -> Result<bool> {
+    if !spatial_join_options.spatial_join_reordering {
+        log::info!(
+            "spatial join swap heuristic disabled via 
sedona.spatial_join.spatial_join_reordering"
+        );
+        return Ok(false);
+    }
+
     let left_stats = left.partition_statistics(None)?;
     let right_stats = right.partition_statistics(None)?;
 
diff --git a/rust/sedona-spatial-join/tests/spatial_join_integration.rs 
b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
index da54f52e..60f0033b 100644
--- a/rust/sedona-spatial-join/tests/spatial_join_integration.rs
+++ b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
@@ -283,6 +283,7 @@ fn single_row_table(schema: SchemaRef, id: i32, marker: 
&str) -> Result<Arc<dyn
 // Keep the data fixed and vary only the advertised stats so the planner swap
 // decision is explained entirely by the heuristic under test.
 async fn assert_build_side_from_stats(
+    options: SpatialJoinOptions,
     left_num_rows: Option<usize>,
     right_num_rows: Option<usize>,
     left_total_byte_size: Option<usize>,
@@ -309,7 +310,7 @@ async fn assert_build_side_from_stats(
         stats_with(right_schema.as_ref(), right_num_rows, 
right_total_byte_size),
     ));
 
-    let ctx = setup_context(Some(SpatialJoinOptions::default()), 10)?;
+    let ctx = setup_context(Some(options), 10)?;
     ctx.register_table("L", left_provider)?;
     ctx.register_table("R", right_provider)?;
 
@@ -692,6 +693,7 @@ async fn test_spatial_join_swap_inputs_produces_same_plan(
 // smaller-row input on the build side even if it is larger by byte size.
 async fn test_spatial_join_reordering_uses_row_count() -> Result<()> {
     assert_build_side_from_stats(
+        SpatialJoinOptions::default(),
         Some(100),
         Some(10),
         Some(100),
@@ -706,6 +708,7 @@ async fn test_spatial_join_reordering_uses_row_count() -> 
Result<()> {
 // smaller-bytes input on the build side.
 async fn test_spatial_join_reordering_uses_size_fallback() -> Result<()> {
     assert_build_side_from_stats(
+        SpatialJoinOptions::default(),
         None,
         None,
         Some(10_000),
@@ -719,7 +722,33 @@ async fn test_spatial_join_reordering_uses_size_fallback() 
-> Result<()> {
 // When both row count and size are absent, the planner preserves the original
 // join order.
 async fn test_spatial_join_reordering_preserves_order_without_stats() -> 
Result<()> {
-    assert_build_side_from_stats(None, None, None, None, 
OriginalInputSide::Left).await
+    assert_build_side_from_stats(
+        SpatialJoinOptions::default(),
+        None,
+        None,
+        None,
+        None,
+        OriginalInputSide::Left,
+    )
+    .await
+}
+
+#[tokio::test]
+// When join reordering is disabled, the planner preserves the original join
+// order even if statistics would normally trigger a swap.
+async fn test_spatial_join_reordering_can_be_disabled() -> Result<()> {
+    assert_build_side_from_stats(
+        SpatialJoinOptions {
+            spatial_join_reordering: false,
+            ..Default::default()
+        },
+        Some(100),
+        Some(10),
+        Some(100),
+        Some(10_000),
+        OriginalInputSide::Left,
+    )
+    .await
 }
 
 #[tokio::test]

Reply via email to