paleolimbot commented on code in PR #722:
URL: https://github.com/apache/sedona-db/pull/722#discussion_r3082601995


##########
rust/sedona-spatial-join/src/exec.rs:
##########
@@ -128,12 +129,22 @@ impl SpatialJoinExec {
         join_type: &JoinType,
         projection: Option<Vec<usize>>,
         options: &SpatialJoinOptions,
+        join_provider: Arc<dyn SpatialJoinProvider>,
     ) -> Result<Self> {

Review Comment:
   How about:
   
   ```rust
       pub fn with_spatial_join_provider(mut self, join_provider: Arc<dyn 
SpatialJoinProvider>) -> Self {
           self.join_provider = join_provider;
           self
       }
   ```
   
   ...to reduce the code changes for the common path of creating the default 
join and remove the clippy allow?



##########
rust/sedona-spatial-join/src/join_provider.rs:
##########
@@ -66,7 +66,7 @@ pub trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
 
 /// Default implementation of the [SpatialJoinProvider]
 #[derive(Debug)]
-pub(crate) struct DefaultSpatialJoinProvider;
+pub struct DefaultSpatialJoinProvider;

Review Comment:
   With the change I suggested above I think this can stay crate-local



##########
rust/sedona-spatial-join-gpu/src/physical_planner.rs:
##########
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::join_provider::GpuSpatialJoinProvider;
+use crate::options::GpuOptions;
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{DataFusionError, JoinSide, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::SpatialJoinOptions;
+use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
+use sedona_query_planner::spatial_join_physical_planner::{
+    PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{
+    RelationPredicate, SpatialPredicate, SpatialRelationType,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+use sedona_spatial_join::SpatialJoinExec;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct GpuSpatialJoinPhysicalPlanner;
+
+impl GpuSpatialJoinPhysicalPlanner {
+    /// Create a new default join factory
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for GpuSpatialJoinPhysicalPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SpatialJoinPhysicalPlanner for GpuSpatialJoinPhysicalPlanner {
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let supported = 
is_spatial_predicate_supported_on_gpu(args.spatial_predicate);
+        let gpu_options = args
+            .options
+            .extensions
+            .get::<GpuOptions>()
+            .cloned()
+            .unwrap_or_default();
+
+        if !gpu_options.enable {
+            return Ok(None);
+        }
+
+        if !supported {
+            if gpu_options.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(None);
+            } else {
+                return Err(DataFusionError::Plan("GPU spatial join is enabled, 
but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+
+        let should_swap = !matches!(
+            args.spatial_predicate,
+            SpatialPredicate::KNearestNeighbors(_)
+        ) && args.join_type.supports_swap()
+            && should_swap_join_order(
+                args.join_options,
+                args.physical_left.as_ref(),
+                args.physical_right.as_ref(),
+            )?;
+
+        // Repartition the probe side when enabled. This breaks spatial 
locality in sorted/skewed
+        // datasets, leading to more balanced workloads during out-of-core 
spatial join.
+        // We determine which pre-swap input will be the probe AFTER any 
potential swap, and
+        // repartition it here. swap_inputs() will then carry the 
RepartitionExec to the correct
+        // child position.
+        let (physical_left, physical_right) = if 
args.join_options.repartition_probe_side {
+            repartition_probe_side(
+                args.physical_left.clone(),
+                args.physical_right.clone(),
+                args.spatial_predicate,
+                should_swap,
+            )?
+        } else {
+            (args.physical_left.clone(), args.physical_right.clone())
+        };
+
+        let exec = SpatialJoinExec::try_new(
+            physical_left,
+            physical_right,
+            args.spatial_predicate.clone(),
+            args.remainder.cloned(),
+            args.join_type,
+            None,
+            args.join_options,
+            Arc::new(GpuSpatialJoinProvider::new(gpu_options)),
+        )?;
+
+        if should_swap {
+            exec.swap_inputs().map(Some)
+        } else {
+            Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
+        }
+    }
+}
+
+/// Spatial join reordering heuristic:
+/// 1. Put the input with fewer rows on the build side, because fewer entries
+///    produce a smaller and more efficient spatial index (R-tree).
+/// 2. If row-count statistics are unavailable (for example, for CSV sources),
+///    fall back to total input size as an estimate.
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+///    statistics are available.
+fn should_swap_join_order(
+    spatial_join_options: &SpatialJoinOptions,
+    left: &dyn ExecutionPlan,
+    right: &dyn ExecutionPlan,
+) -> Result<bool> {
+    if !spatial_join_options.spatial_join_reordering {
+        log::info!(
+            "spatial join swap heuristic disabled via 
sedona.spatial_join.spatial_join_reordering"
+        );
+        return Ok(false);
+    }
+
+    let left_stats = left.partition_statistics(None)?;
+    let right_stats = right.partition_statistics(None)?;
+
+    let left_num_rows = left_stats.num_rows;
+    let right_num_rows = right_stats.num_rows;
+    let left_total_byte_size = left_stats.total_byte_size;
+    let right_total_byte_size = right_stats.total_byte_size;
+
+    let should_swap = match (left_num_rows.get_value(), 
right_num_rows.get_value()) {
+        (Some(l), Some(r)) => l > r,
+        _ => match (
+            left_total_byte_size.get_value(),
+            right_total_byte_size.get_value(),
+        ) {
+            (Some(l), Some(r)) => l > r,
+            _ => false,
+        },
+    };
+
+    log::info!(
+        "spatial join swap heuristic: left_num_rows={left_num_rows:?}, 
right_num_rows={right_num_rows:?}, 
left_total_byte_size={left_total_byte_size:?}, 
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
+    );
+
+    Ok(should_swap)
+}
+
+/// Repartition the probe side of a spatial join using `RoundRobinBatch` 
partitioning.
+///
+/// The purpose is to break spatial locality in sorted or skewed datasets, 
which can cause
+/// imbalanced partitions when running out-of-core spatial join. The number of 
partitions is
+/// preserved; only the distribution of rows across partitions is shuffled.
+///
+/// The `should_swap` parameter indicates whether `swap_inputs()` will be 
called after
+/// `SpatialJoinExec` is constructed. This affects which pre-swap input will 
become the
+/// probe side:
+/// - For non-KNN predicates: probe is always `Right` after any swap. If 
`should_swap` is true,
+///   the current `left` will become `right` (probe) after swap, so we 
repartition `left`.
+/// - For KNN predicates: `should_swap` is always false, and the probe side is 
determined by
+///   `KNNPredicate::probe_side`.
+fn repartition_probe_side(
+    mut physical_left: Arc<dyn ExecutionPlan>,
+    mut physical_right: Arc<dyn ExecutionPlan>,
+    spatial_predicate: &SpatialPredicate,
+    should_swap: bool,
+) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
+    let probe_plan = match spatial_predicate {
+        SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
+            JoinSide::Left => &mut physical_left,
+            JoinSide::Right => &mut physical_right,
+            JoinSide::None => {
+                // KNNPredicate::probe_side is asserted not to be None in its 
constructor;
+                // treat this as a debug-only invariant violation and default 
to right.
+                debug_assert!(false, "KNNPredicate::probe_side must not be 
JoinSide::None");
+                &mut physical_right
+            }
+        },
+        _ => {
+            // For Relation/Distance predicates, probe is always Right after 
swap.
+            // If should_swap, the current left will be moved to the right 
(probe) by swap_inputs().
+            if should_swap {
+                &mut physical_left
+            } else {
+                &mut physical_right
+            }
+        }
+    };
+
+    *probe_plan = Arc::new(ProbeShuffleExec::try_new(Arc::clone(probe_plan))?);
+
+    Ok((physical_left, physical_right))
+}
+
+pub fn is_spatial_predicate_supported(
+    spatial_predicate: &SpatialPredicate,
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Result<bool> {
+    fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema: 
&Schema) -> Result<bool> {
+        let return_field = expr.return_field(schema)?;

Review Comment:
   I don't think you need both `is_spatial_predicate_supported()` and 
`is_spatial_predicate_supported_on_gpu()`?



##########
rust/sedona-spatial-join-gpu/src/physical_planner.rs:
##########
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::join_provider::GpuSpatialJoinProvider;
+use crate::options::GpuOptions;
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{DataFusionError, JoinSide, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::SpatialJoinOptions;
+use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
+use sedona_query_planner::spatial_join_physical_planner::{
+    PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{
+    RelationPredicate, SpatialPredicate, SpatialRelationType,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+use sedona_spatial_join::SpatialJoinExec;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct GpuSpatialJoinPhysicalPlanner;
+
+impl GpuSpatialJoinPhysicalPlanner {
+    /// Create a new default join factory
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for GpuSpatialJoinPhysicalPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SpatialJoinPhysicalPlanner for GpuSpatialJoinPhysicalPlanner {
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let supported = 
is_spatial_predicate_supported_on_gpu(args.spatial_predicate);
+        let gpu_options = args
+            .options
+            .extensions
+            .get::<GpuOptions>()
+            .cloned()
+            .unwrap_or_default();
+
+        if !gpu_options.enable {
+            return Ok(None);
+        }
+
+        if !supported {
+            if gpu_options.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(None);
+            } else {
+                return Err(DataFusionError::Plan("GPU spatial join is enabled, 
but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+
+        let should_swap = !matches!(
+            args.spatial_predicate,
+            SpatialPredicate::KNearestNeighbors(_)
+        ) && args.join_type.supports_swap()
+            && should_swap_join_order(
+                args.join_options,
+                args.physical_left.as_ref(),
+                args.physical_right.as_ref(),
+            )?;
+
+        // Repartition the probe side when enabled. This breaks spatial 
locality in sorted/skewed
+        // datasets, leading to more balanced workloads during out-of-core 
spatial join.
+        // We determine which pre-swap input will be the probe AFTER any 
potential swap, and
+        // repartition it here. swap_inputs() will then carry the 
RepartitionExec to the correct
+        // child position.
+        let (physical_left, physical_right) = if 
args.join_options.repartition_probe_side {
+            repartition_probe_side(
+                args.physical_left.clone(),
+                args.physical_right.clone(),
+                args.spatial_predicate,
+                should_swap,
+            )?
+        } else {
+            (args.physical_left.clone(), args.physical_right.clone())
+        };

Review Comment:
   You're in control of this code now, so maybe you just want to turn this off 
for the GPU join since it's slowing you down? You could then remove the 
partitioner below to keep your join planner a bit more focused.



##########
rust/sedona-spatial-join-gpu/src/physical_planner.rs:
##########
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::join_provider::GpuSpatialJoinProvider;
+use crate::options::GpuOptions;
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{DataFusionError, JoinSide, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::SpatialJoinOptions;
+use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
+use sedona_query_planner::spatial_join_physical_planner::{
+    PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{
+    RelationPredicate, SpatialPredicate, SpatialRelationType,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+use sedona_spatial_join::SpatialJoinExec;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct GpuSpatialJoinPhysicalPlanner;
+
+impl GpuSpatialJoinPhysicalPlanner {
+    /// Create a new default join factory
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for GpuSpatialJoinPhysicalPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SpatialJoinPhysicalPlanner for GpuSpatialJoinPhysicalPlanner {
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let supported = 
is_spatial_predicate_supported_on_gpu(args.spatial_predicate);
+        let gpu_options = args
+            .options
+            .extensions
+            .get::<GpuOptions>()
+            .cloned()
+            .unwrap_or_default();
+
+        if !gpu_options.enable {
+            return Ok(None);
+        }
+
+        if !supported {
+            if gpu_options.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(None);
+            } else {
+                return Err(DataFusionError::Plan("GPU spatial join is enabled, 
but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+
+        let should_swap = !matches!(
+            args.spatial_predicate,
+            SpatialPredicate::KNearestNeighbors(_)
+        ) && args.join_type.supports_swap()
+            && should_swap_join_order(
+                args.join_options,
+                args.physical_left.as_ref(),
+                args.physical_right.as_ref(),
+            )?;
+
+        // Repartition the probe side when enabled. This breaks spatial 
locality in sorted/skewed
+        // datasets, leading to more balanced workloads during out-of-core 
spatial join.
+        // We determine which pre-swap input will be the probe AFTER any 
potential swap, and
+        // repartition it here. swap_inputs() will then carry the 
RepartitionExec to the correct
+        // child position.
+        let (physical_left, physical_right) = if 
args.join_options.repartition_probe_side {
+            repartition_probe_side(
+                args.physical_left.clone(),
+                args.physical_right.clone(),
+                args.spatial_predicate,
+                should_swap,
+            )?
+        } else {
+            (args.physical_left.clone(), args.physical_right.clone())
+        };
+
+        let exec = SpatialJoinExec::try_new(
+            physical_left,
+            physical_right,
+            args.spatial_predicate.clone(),
+            args.remainder.cloned(),
+            args.join_type,
+            None,
+            args.join_options,
+            Arc::new(GpuSpatialJoinProvider::new(gpu_options)),
+        )?;
+
+        if should_swap {
+            exec.swap_inputs().map(Some)
+        } else {
+            Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
+        }
+    }
+}
+
+/// Spatial join reordering heuristic:
+/// 1. Put the input with fewer rows on the build side, because fewer entries
+///    produce a smaller and more efficient spatial index (R-tree).
+/// 2. If row-count statistics are unavailable (for example, for CSV sources),
+///    fall back to total input size as an estimate.
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+///    statistics are available.
+fn should_swap_join_order(
+    spatial_join_options: &SpatialJoinOptions,
+    left: &dyn ExecutionPlan,
+    right: &dyn ExecutionPlan,
+) -> Result<bool> {
+    if !spatial_join_options.spatial_join_reordering {
+        log::info!(
+            "spatial join swap heuristic disabled via 
sedona.spatial_join.spatial_join_reordering"
+        );
+        return Ok(false);
+    }
+
+    let left_stats = left.partition_statistics(None)?;
+    let right_stats = right.partition_statistics(None)?;
+
+    let left_num_rows = left_stats.num_rows;
+    let right_num_rows = right_stats.num_rows;
+    let left_total_byte_size = left_stats.total_byte_size;
+    let right_total_byte_size = right_stats.total_byte_size;
+
+    let should_swap = match (left_num_rows.get_value(), 
right_num_rows.get_value()) {
+        (Some(l), Some(r)) => l > r,
+        _ => match (
+            left_total_byte_size.get_value(),
+            right_total_byte_size.get_value(),
+        ) {
+            (Some(l), Some(r)) => l > r,
+            _ => false,
+        },
+    };
+
+    log::info!(
+        "spatial join swap heuristic: left_num_rows={left_num_rows:?}, 
right_num_rows={right_num_rows:?}, 
left_total_byte_size={left_total_byte_size:?}, 
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
+    );
+
+    Ok(should_swap)
+}
+
+/// Repartition the probe side of a spatial join using `RoundRobinBatch` 
partitioning.
+///
+/// The purpose is to break spatial locality in sorted or skewed datasets, 
which can cause
+/// imbalanced partitions when running out-of-core spatial join. The number of 
partitions is
+/// preserved; only the distribution of rows across partitions is shuffled.
+///
+/// The `should_swap` parameter indicates whether `swap_inputs()` will be 
called after
+/// `SpatialJoinExec` is constructed. This affects which pre-swap input will 
become the
+/// probe side:
+/// - For non-KNN predicates: probe is always `Right` after any swap. If 
`should_swap` is true,
+///   the current `left` will become `right` (probe) after swap, so we 
repartition `left`.
+/// - For KNN predicates: `should_swap` is always false, and the probe side is 
determined by
+///   `KNNPredicate::probe_side`.
+fn repartition_probe_side(
+    mut physical_left: Arc<dyn ExecutionPlan>,
+    mut physical_right: Arc<dyn ExecutionPlan>,
+    spatial_predicate: &SpatialPredicate,
+    should_swap: bool,
+) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
+    let probe_plan = match spatial_predicate {
+        SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
+            JoinSide::Left => &mut physical_left,
+            JoinSide::Right => &mut physical_right,
+            JoinSide::None => {
+                // KNNPredicate::probe_side is asserted not to be None in its 
constructor;
+                // treat this as a debug-only invariant violation and default 
to right.
+                debug_assert!(false, "KNNPredicate::probe_side must not be 
JoinSide::None");
+                &mut physical_right
+            }
+        },
+        _ => {
+            // For Relation/Distance predicates, probe is always Right after 
swap.
+            // If should_swap, the current left will be moved to the right 
(probe) by swap_inputs().
+            if should_swap {
+                &mut physical_left
+            } else {
+                &mut physical_right
+            }
+        }
+    };
+
+    *probe_plan = Arc::new(ProbeShuffleExec::try_new(Arc::clone(probe_plan))?);
+
+    Ok((physical_left, physical_right))
+}
+
+pub fn is_spatial_predicate_supported(
+    spatial_predicate: &SpatialPredicate,
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Result<bool> {
+    fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema: 
&Schema) -> Result<bool> {
+        let return_field = expr.return_field(schema)?;
+        let sedona_type = SedonaType::from_storage_field(&return_field)?;
+        Ok(ArgMatcher::is_geometry().match_type(&sedona_type))
+    }
+
+    let both_geometry =
+        |left: &Arc<dyn PhysicalExpr>, right: &Arc<dyn PhysicalExpr>| -> 
Result<bool> {
+            Ok(is_geometry_type_supported(left, left_schema)?
+                && is_geometry_type_supported(right, right_schema)?)
+        };
+
+    match spatial_predicate {
+        SpatialPredicate::Relation(RelationPredicate {
+            left,
+            right,
+            relation_type,
+        }) => {
+            if !matches!(
+                relation_type,
+                SpatialRelationType::Intersects
+                    | SpatialRelationType::Contains
+                    | SpatialRelationType::Within
+                    | SpatialRelationType::Covers
+                    | SpatialRelationType::CoveredBy
+                    | SpatialRelationType::Touches
+                    | SpatialRelationType::Equals
+            ) {
+                return Ok(false);
+            }
+
+            both_geometry(left, right)
+        }
+        SpatialPredicate::Distance(_) | SpatialPredicate::KNearestNeighbors(_) 
=> Ok(false),
+    }
+}
+
+fn is_spatial_predicate_supported_on_gpu(spatial_predicate: &SpatialPredicate) 
-> bool {
+    match spatial_predicate {
+        SpatialPredicate::Relation(rel) => match rel.relation_type {
+            SpatialRelationType::Intersects => true,
+            SpatialRelationType::Contains => true,
+            SpatialRelationType::Within => true,
+            SpatialRelationType::Covers => true,
+            SpatialRelationType::CoveredBy => true,
+            SpatialRelationType::Touches => true,
+            SpatialRelationType::Crosses => false,
+            SpatialRelationType::Overlaps => false,
+            SpatialRelationType::Equals => true,
+        },
+        SpatialPredicate::Distance(_) => false,
+        SpatialPredicate::KNearestNeighbors(_) => false,
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use arrow_schema::{DataType, Field};
+    use datafusion_physical_expr::expressions::Column;
+    use sedona_query_planner::spatial_predicate::{KNNPredicate, 
SpatialRelationType};
+    use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
+
+    #[test]
+    fn test_is_spatial_predicate_supported() {
+        // Planar geometry field
+        let geom_field = WKB_GEOMETRY.to_storage_field("geom", false).unwrap();

Review Comment:
   I am not sure you need these (or if you do, perhaps tune them to your 
GPU-specific requirements?)



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs:
##########
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, 
GpuSpatialRefiner};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+    schema: SchemaRef,
+    spatial_predicate: SpatialPredicate,
+    options: SpatialJoinOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    /// Batches to be indexed
+    indexed_batches: Vec<EvaluatedBatch>,
+    /// Statistics for indexed geometries
+    memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+    pub(crate) fn is_using_gpu(
+        spatial_predicate: &SpatialPredicate,
+        join_opts: &SpatialJoinOptions,
+    ) -> Result<bool> {
+        if join_opts.gpu.enable {
+            if Self::is_spatial_predicate_supported_on_gpu(spatial_predicate) {
+                return Ok(true);
+            } else if join_opts.gpu.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(false);
+            } else {
+                return Err(DataFusionError::Execution("GPU spatial join is 
enabled, but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+        Ok(false)
+    }
+
+    fn is_spatial_predicate_supported_on_gpu(spatial_predicate: 
&SpatialPredicate) -> bool {
+        match spatial_predicate {
+            SpatialPredicate::Relation(rel) => match rel.relation_type {
+                SpatialRelationType::Intersects => true,
+                SpatialRelationType::Contains => true,
+                SpatialRelationType::Within => true,
+                SpatialRelationType::Covers => true,
+                SpatialRelationType::CoveredBy => true,
+                SpatialRelationType::Touches => true,
+                SpatialRelationType::Crosses => false,
+                SpatialRelationType::Overlaps => false,
+                SpatialRelationType::Equals => true,
+            },
+            SpatialPredicate::Distance(_) => false,
+            SpatialPredicate::KNearestNeighbors(_) => false,
+        }
+    }
+    pub fn new(
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Self {
+        Self {
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+            indexed_batches: vec![],
+            memory_used: 0,
+        }
+    }
+
+    fn build_visited_bitmaps(&mut self) -> 
Result<Option<Mutex<Vec<BooleanBufferBuilder>>>> {
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(None);
+        }
+
+        let mut bitmaps = Vec::with_capacity(self.indexed_batches.len());
+        let mut total_buffer_size = 0;
+
+        for batch in &self.indexed_batches {
+            let batch_rows = batch.batch.num_rows();
+            let buffer_size = batch_rows.div_ceil(8);
+            total_buffer_size += buffer_size;
+
+            let mut bitmap = BooleanBufferBuilder::new(batch_rows);
+            bitmap.append_n(batch_rows, false);
+            bitmaps.push(bitmap);
+        }
+
+        self.record_memory_usage(total_buffer_size);
+
+        Ok(Some(Mutex::new(bitmaps)))
+    }
+
+    fn record_memory_usage(&mut self, bytes: usize) {
+        self.memory_used += bytes;
+        self.metrics.build_mem_used.set_max(self.memory_used);
+    }
+    /// Add a geometry batch to be indexed.
+    ///
+    /// This method accumulates geometry batches that will be used to build 
the spatial index.
+    /// Each batch contains processed geometry data along with memory usage 
information.
+    fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+        let in_mem_size = indexed_batch.in_mem_size()?;
+        self.indexed_batches.push(indexed_batch);
+        self.record_memory_usage(in_mem_size);
+        Ok(())
+    }
+
+    pub(crate) fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        _spatial_predicate: &SpatialPredicate,
+        _options: &SpatialJoinOptions,
+    ) -> usize {
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        // Each geometry requires 4 f32 values to store the bounding rectangle 
(min_x, min_y, max_x, max_y)
+        num_geoms * (4 * 4)
+    }
+}
+fn concat_evaluated_batches(batches: &[EvaluatedBatch]) -> 
Result<EvaluatedBatch> {
+    if batches.is_empty() {
+        return sedona_internal_err!("Cannot concatenate empty list of 
EvaluatedBatches");
+    }
+
+    // 1. Concatenate the underlying Arrow RecordBatches
+    let schema = batches[0].schema();
+    let record_batches: Vec<&RecordBatch> = batches.iter().map(|b| 
&b.batch).collect();
+    let concatenated_batch = concat_batches(&schema, record_batches)?;
+
+    // 2. Prepare for Geometry Interleaving
+    // We need to create a list of (batch_index, row_index) for every row in 
order
+    let mut indices = Vec::with_capacity(concatenated_batch.num_rows());
+    for (batch_idx, batch) in batches.iter().enumerate() {
+        for row_idx in 0..batch.num_rows() {
+            indices.push((batch_idx, row_idx));
+        }
+    }
+
+    // 3. Concatenate Geometry Arrays using the interleave method
+    let geom_arrays: Vec<&EvaluatedGeometryArray> = batches.iter().map(|b| 
&b.geom_array).collect();
+
+    let concatenated_geom_array = 
EvaluatedGeometryArray::interleave(&geom_arrays, &indices)?;

Review Comment:
   Since you've taken the time to diagnose this as significant, it is worth 
opening an issue and adding a comment here in case neither of us remembers 🙂 



##########
rust/sedona-spatial-join-gpu/src/join_provider.rs:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::GpuSpatialIndexBuilder;
+use arrow_array::ArrayRef;
+use arrow_schema::SchemaRef;
+use datafusion_common::JoinType;
+use datafusion_common::Result;
+use geo_index::rtree::util::f64_box_to_f32;
+use geo_types::{coord, Rect};
+use sedona_common::SpatialJoinOptions;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::executor::IterGeo;
+use sedona_geo_generic_alg::BoundingRect;
+use sedona_schema::datatypes::SedonaType;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::join_provider::SpatialJoinProvider;
+use sedona_spatial_join::operand_evaluator::{
+    EvaluatedGeometryArray, EvaluatedGeometryArrayFactory,
+};
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::Arc;
+use wkb::reader::GeometryType;
+
+#[derive(Debug)]
+pub(crate) struct GpuSpatialJoinProvider;
+
+impl SpatialJoinProvider for GpuSpatialJoinProvider {
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>> {
+        let builder = GpuSpatialIndexBuilder::new(
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+        );
+        Ok(Box::new(builder))
+    }
+
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        GpuSpatialIndexBuilder::estimate_extra_memory_usage(geo_stats, 
spatial_predicate, options)
+    }
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory> {
+        Arc::new(DefaultGeometryArrayFactory)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct DefaultGeometryArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for DefaultGeometryArrayFactory {
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+    ) -> Result<EvaluatedGeometryArray> {
+        let num_rows = geometry_array.len();
+        let mut rect_vec = Vec::with_capacity(num_rows);
+        geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
+            let rect_opt = if let Some(wkb) = &wkb_opt {
+                if let Some(rect) = wkb.bounding_rect() {
+                    let min = rect.min();
+                    let max = rect.max();
+                    // Why conservative bounding boxes prevent false negatives:
+                    // 1. P32 = round_nearest(P64), so P32 is the closest 
possible float to P64.
+                    // 2. Min32 = round_down(Min64), guaranteeing Min32 <= 
Min64.
+                    // 3. Max32 = round_up(Max64), guaranteeing Max32 >= Max64.
+                    // If P64 is inside Box64 (Min64 <= P64 <= Max64), P32 
cannot fall outside Box32.
+                    // If P32 < Min32, it would mean Min32 is closer to P64 
than P32 is, which
+                    // contradicts P32 being the nearest float. Therefore, 
false negatives are impossible.

Review Comment:
   Can you add a comment referencing that this is an optimization for points 
and you're referring to some of this rounding occurring in the spatial index 
(or something else if I've misunderstood). As it is this comment doesn't make 
sense and somebody is likely to come through here and make it "more correct" if 
it is not explained properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to