paleolimbot commented on code in PR #722:
URL: https://github.com/apache/sedona-db/pull/722#discussion_r3059920000


##########
rust/sedona-spatial-join-gpu/src/physical_planner.rs:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::index::GpuSpatialIndexBuilder;
+use crate::join_provider::GpuSpatialJoinProvider;
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{JoinSide, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
+use sedona_query_planner::spatial_join_physical_planner::{
+    PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{
+    DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+use sedona_spatial_join::join_provider::{DefaultSpatialJoinProvider, 
SpatialJoinProvider};
+use sedona_spatial_join::SpatialJoinExec;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct GpuSpatialJoinPhysicalPlanner;
+
+impl GpuSpatialJoinPhysicalPlanner {
+    /// Create a new default join factory
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for GpuSpatialJoinPhysicalPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SpatialJoinPhysicalPlanner for GpuSpatialJoinPhysicalPlanner {
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if !is_spatial_predicate_supported(
+            args.spatial_predicate,
+            &args.physical_left.schema(),
+            &args.physical_right.schema(),
+        )? {
+            return Ok(None);
+        }

Review Comment:
   Should you check the fallback option here and error if it is false?



##########
rust/sedona/src/context.rs:
##########
@@ -149,11 +149,24 @@ impl SedonaContext {
         let mut planner = SedonaQueryPlanner::new();
         #[cfg(feature = "spatial-join")]
         {
-            use 
sedona_spatial_join::physical_planner::DefaultSpatialJoinPhysicalPlanner;
+            #[cfg(not(feature = "gpu"))]

Review Comment:
   ```suggestion
   ```
   
   I believe that we always want the default spatial join in place here...this 
is how fallback works when the GPU planner returns `None`.



##########
rust/sedona-spatial-join/src/index/default_spatial_index.rs:
##########
@@ -283,6 +283,7 @@ impl SpatialIndex for DefaultSpatialIndex {
         self.inner.schema.clone()
     }
 
+    #[allow(dead_code)]

Review Comment:
   ```suggestion
   ```
   
   I think this can be removed now because the trait is pub?



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs:
##########
@@ -0,0 +1,765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::BooleanBufferBuilder;
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::{DataFusionError, Result};
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{ExecutionMode, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{
+    GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner, 
GpuSpatialRelationPredicate,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndex;
+use sedona_spatial_join::index::QueryResultMetrics;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::SpatialPredicate;
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use wkb::reader::Wkb;
+
+pub struct GPUSpatialIndex {
+    pub(crate) schema: SchemaRef,
+    pub(crate) _options: SpatialJoinOptions,
+    /// GPU spatial index for performing GPU-accelerated filtering
+    pub(crate) index: Arc<GpuSpatialIndex>,
+    /// GPU spatial refiner for performing GPU-accelerated refinement
+    pub(crate) refiner: Arc<GpuSpatialRefiner>,
+    pub(crate) spatial_predicate: SpatialPredicate,
+    /// Indexed batches containing evaluated geometry arrays. It contains the 
original record
+    /// batches and geometry arrays obtained by evaluating the geometry 
expression on the build side.
+    pub(crate) indexed_batches: Vec<EvaluatedBatch>,
+    /// An array for translating data index to geometry batch index and row 
index
+    pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
+    /// Shared bitmap builders for visited left indices, one per batch
+    pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+    /// Counter of running probe-threads, potentially able to update `bitmap`.
+    /// Each time a probe thread finished probing the index, it will decrement 
the counter.
+    /// The last finished probe thread will produce the extra output batches 
for unmatched
+    /// build side when running left-outer joins. See also 
[`report_probe_completed`].
+    pub(crate) probe_threads_counter: AtomicUsize,
+}
+impl GPUSpatialIndex {
+    pub fn empty(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        let gpu_options = GpuSpatialOptions {
+            cuda_use_memory_pool: options.gpu.use_memory_pool,
+            cuda_memory_pool_init_percent: 
options.gpu.memory_pool_init_percentage as i32,
+            concurrency: 1,
+            device_id: options.gpu.device_id as i32,
+            compress_bvh: options.gpu.compress_bvh,
+            pipeline_batches: options.gpu.pipeline_batches as u32,
+        };
+
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index: Arc::new(
+                GpuSpatialIndex::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            refiner: Arc::new(
+                GpuSpatialRefiner::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            indexed_batches: vec![],
+            data_id_to_batch_pos: vec![],
+            visited_build_side: None,
+            probe_threads_counter,
+        })
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        index: Arc<GpuSpatialIndex>,
+        refiner: Arc<GpuSpatialRefiner>,
+        indexed_batches: Vec<EvaluatedBatch>,
+        data_id_to_batch_pos: Vec<(i32, i32)>,
+        visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index,
+            refiner,
+            indexed_batches,
+            data_id_to_batch_pos,
+            visited_build_side,
+            probe_threads_counter,
+        })
+    }
+
+    fn refine(
+        &self,
+        probe_geoms: &ArrayRef,
+        predicate: &SpatialPredicate,
+        build_indices: &mut Vec<u32>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<()> {
+        match predicate {
+            SpatialPredicate::Relation(rel_p) => {
+                self.refiner
+                    .refine(
+                        probe_geoms,
+                        Self::convert_relation_type(&rel_p.relation_type)?,
+                        build_indices,
+                        probe_indices,
+                    )
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "GPU spatial refinement failed: {:?}",
+                            e
+                        ))
+                    })?;
+                Ok(())
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Only Relation predicate is supported for GPU spatial 
query".to_string(),
+            )),
+        }
+    }
+    // Translate Sedona SpatialRelationType to GpuSpatialRelationPredicate
+    fn convert_relation_type(t: &SpatialRelationType) -> 
Result<GpuSpatialRelationPredicate> {
+        match t {
+            SpatialRelationType::Equals => 
Ok(GpuSpatialRelationPredicate::Equals),
+            SpatialRelationType::Touches => 
Ok(GpuSpatialRelationPredicate::Touches),
+            SpatialRelationType::Contains => 
Ok(GpuSpatialRelationPredicate::Contains),
+            SpatialRelationType::Covers => 
Ok(GpuSpatialRelationPredicate::Covers),
+            SpatialRelationType::Intersects => 
Ok(GpuSpatialRelationPredicate::Intersects),
+            SpatialRelationType::Within => 
Ok(GpuSpatialRelationPredicate::Within),
+            SpatialRelationType::CoveredBy => 
Ok(GpuSpatialRelationPredicate::CoveredBy),
+            _ => {
+                // This should not happen as we check for supported predicates 
earlier
+                Err(DataFusionError::Execution(format!(
+                    "Unsupported spatial relation type for GPU: {:?}",
+                    t
+                )))
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl SpatialIndex for GPUSpatialIndex {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    #[allow(dead_code)] // This is used for tests
+    fn num_indexed_batches(&self) -> usize {
+        self.indexed_batches.len()
+    }
+    fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch {
+        &self.indexed_batches[batch_idx].batch
+    }
+
+    /// This method implements [`SpatialIndex::query_batch`] with 
GPU-accelerated spatial filtering
+    /// and refinement. It takes a batch of probe geometries and a range of 
row indices to process,
+    /// performs a spatial query on the GPU to find candidate matches,
+    /// refines the candidates using the GPU refiner,
+    /// and returns the matching build batch positions and probe indices.
+    async fn query_batch(
+        &self,
+        evaluated_batch: &Arc<EvaluatedBatch>,
+        range: Range<usize>,
+        _max_result_size: usize,
+        build_batch_positions: &mut Vec<(i32, i32)>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<(QueryResultMetrics, usize)> {
+        if range.is_empty() {
+            return Ok((
+                QueryResultMetrics {
+                    count: 0,
+                    candidate_count: 0,
+                },
+                range.start,
+            ));
+        }
+        let index = &self.index.as_ref();
+
+        let empty_rect = Rect::new(
+            coord!(x: f32::NAN, y: f32::NAN),
+            coord!(x: f32::NAN, y: f32::NAN),
+        );
+        let rects: Vec<_> = range
+            .clone()
+            .map(|row_idx| 
evaluated_batch.geom_array.rects()[row_idx].unwrap_or(empty_rect))
+            .collect();
+
+        let (mut gpu_build_indices, mut gpu_probe_indices) =
+            index.probe(rects.as_ref()).map_err(|e| {
+                DataFusionError::Execution(format!("GPU spatial query failed: 
{:?}", e))
+            })?;
+
+        assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+        let candidate_count = gpu_build_indices.len();
+
+        self.refine(
+            evaluated_batch.geom_array.geometry_array(),
+            &self.spatial_predicate,
+            &mut gpu_build_indices,
+            &mut gpu_probe_indices,
+        )?;
+
+        assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+        let total_count = gpu_build_indices.len();
+
+        for (build_idx, probe_idx) in 
gpu_build_indices.iter().zip(gpu_probe_indices.iter()) {
+            let data_id = *build_idx as usize;
+            let (batch_idx, row_idx) = self.data_id_to_batch_pos[data_id];
+            build_batch_positions.push((batch_idx, row_idx));
+            probe_indices.push(range.start as u32 + probe_idx);
+        }
+        Ok((
+            QueryResultMetrics {
+                count: total_count,
+                candidate_count,
+            },
+            range.end,
+        ))
+    }
+    fn need_more_probe_stats(&self) -> bool {
+        false
+    }
+
+    fn merge_probe_stats(&self, stats: GeoStatistics) {
+        let _ = stats;
+    }
+
+    fn visited_build_side(&self) -> Option<&Mutex<Vec<BooleanBufferBuilder>>> {
+        self.visited_build_side.as_ref()
+    }
+
+    fn report_probe_completed(&self) -> bool {
+        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
+    }
+
+    fn get_refiner_mem_usage(&self) -> usize {
+        0
+    }
+
+    fn get_actual_execution_mode(&self) -> ExecutionMode {
+        ExecutionMode::PrepareBuild // GPU-based spatial index is always on 
PrepareBuild mode
+    }
+
+    fn query_knn(
+        &self,
+        _probe_wkb: &Wkb,
+        _k: u32,
+        _use_spheroid: bool,
+        _include_tie_breakers: bool,
+        _build_batch_positions: &mut Vec<(i32, i32)>,
+        _distances: Option<&mut Vec<f64>>,
+    ) -> Result<QueryResultMetrics> {
+        Err(DataFusionError::NotImplemented(
+            "KNN query is not implemented for GPU spatial index".to_string(),
+        ))
+    }
+}
+
+#[cfg(test)]
+#[cfg(feature = "gpu")]
+mod tests {
+    use crate::index::gpu_spatial_index_builder::GPUSpatialIndexBuilder;
+    use arrow_array::RecordBatch;
+    use arrow_schema::{DataType, Field, SchemaRef};
+    use datafusion_common::JoinType;
+    use datafusion_physical_expr::expressions::Column;
+    use futures::Stream;

Review Comment:
   For #702 we have to move all of these tests into `rust/sedona/tests` to 
avoid circular dev dependencies, where we can deduplicate the common pieces 
with sedona-spatial-join.



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs:
##########
@@ -0,0 +1,765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::BooleanBufferBuilder;
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::{DataFusionError, Result};
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{ExecutionMode, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{
+    GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner, 
GpuSpatialRelationPredicate,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndex;
+use sedona_spatial_join::index::QueryResultMetrics;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::SpatialPredicate;
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use wkb::reader::Wkb;
+
+pub struct GPUSpatialIndex {
+    pub(crate) schema: SchemaRef,
+    pub(crate) _options: SpatialJoinOptions,
+    /// GPU spatial index for performing GPU-accelerated filtering
+    pub(crate) index: Arc<GpuSpatialIndex>,
+    /// GPU spatial refiner for performing GPU-accelerated refinement
+    pub(crate) refiner: Arc<GpuSpatialRefiner>,
+    pub(crate) spatial_predicate: SpatialPredicate,
+    /// Indexed batches containing evaluated geometry arrays. It contains the 
original record
+    /// batches and geometry arrays obtained by evaluating the geometry 
expression on the build side.
+    pub(crate) indexed_batches: Vec<EvaluatedBatch>,
+    /// An array for translating data index to geometry batch index and row 
index
+    pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
+    /// Shared bitmap builders for visited left indices, one per batch
+    pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+    /// Counter of running probe-threads, potentially able to update `bitmap`.
+    /// Each time a probe thread finished probing the index, it will decrement 
the counter.
+    /// The last finished probe thread will produce the extra output batches 
for unmatched
+    /// build side when running left-outer joins. See also 
[`report_probe_completed`].
+    pub(crate) probe_threads_counter: AtomicUsize,
+}
+impl GPUSpatialIndex {
+    pub fn empty(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        let gpu_options = GpuSpatialOptions {
+            cuda_use_memory_pool: options.gpu.use_memory_pool,
+            cuda_memory_pool_init_percent: 
options.gpu.memory_pool_init_percentage as i32,
+            concurrency: 1,
+            device_id: options.gpu.device_id as i32,
+            compress_bvh: options.gpu.compress_bvh,
+            pipeline_batches: options.gpu.pipeline_batches as u32,
+        };
+
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index: Arc::new(
+                GpuSpatialIndex::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            refiner: Arc::new(
+                GpuSpatialRefiner::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            indexed_batches: vec![],
+            data_id_to_batch_pos: vec![],
+            visited_build_side: None,
+            probe_threads_counter,
+        })
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        index: Arc<GpuSpatialIndex>,
+        refiner: Arc<GpuSpatialRefiner>,
+        indexed_batches: Vec<EvaluatedBatch>,
+        data_id_to_batch_pos: Vec<(i32, i32)>,
+        visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index,
+            refiner,
+            indexed_batches,
+            data_id_to_batch_pos,
+            visited_build_side,
+            probe_threads_counter,
+        })
+    }
+
+    fn refine(
+        &self,
+        probe_geoms: &ArrayRef,
+        predicate: &SpatialPredicate,
+        build_indices: &mut Vec<u32>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<()> {
+        match predicate {
+            SpatialPredicate::Relation(rel_p) => {
+                self.refiner
+                    .refine(
+                        probe_geoms,
+                        Self::convert_relation_type(&rel_p.relation_type)?,
+                        build_indices,
+                        probe_indices,
+                    )
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "GPU spatial refinement failed: {:?}",
+                            e
+                        ))
+                    })?;
+                Ok(())
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Only Relation predicate is supported for GPU spatial 
query".to_string(),
+            )),
+        }
+    }
+    // Translate Sedona SpatialRelationType to GpuSpatialRelationPredicate
+    fn convert_relation_type(t: &SpatialRelationType) -> 
Result<GpuSpatialRelationPredicate> {
+        match t {
+            SpatialRelationType::Equals => 
Ok(GpuSpatialRelationPredicate::Equals),
+            SpatialRelationType::Touches => 
Ok(GpuSpatialRelationPredicate::Touches),
+            SpatialRelationType::Contains => 
Ok(GpuSpatialRelationPredicate::Contains),
+            SpatialRelationType::Covers => 
Ok(GpuSpatialRelationPredicate::Covers),
+            SpatialRelationType::Intersects => 
Ok(GpuSpatialRelationPredicate::Intersects),
+            SpatialRelationType::Within => 
Ok(GpuSpatialRelationPredicate::Within),
+            SpatialRelationType::CoveredBy => 
Ok(GpuSpatialRelationPredicate::CoveredBy),
+            _ => {
+                // This should not happen as we check for supported predicates 
earlier
+                Err(DataFusionError::Execution(format!(
+                    "Unsupported spatial relation type for GPU: {:?}",
+                    t
+                )))
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl SpatialIndex for GPUSpatialIndex {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    #[allow(dead_code)] // This is used for tests
+    fn num_indexed_batches(&self) -> usize {
+        self.indexed_batches.len()
+    }
+    fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch {
+        &self.indexed_batches[batch_idx].batch
+    }
+
+    /// This method implements [`SpatialIndex::query_batch`] with 
GPU-accelerated spatial filtering
+    /// and refinement. It takes a batch of probe geometries and a range of 
row indices to process,
+    /// performs a spatial query on the GPU to find candidate matches,
+    /// refines the candidates using the GPU refiner,
+    /// and returns the matching build batch positions and probe indices.
+    async fn query_batch(
+        &self,
+        evaluated_batch: &Arc<EvaluatedBatch>,
+        range: Range<usize>,
+        _max_result_size: usize,
+        build_batch_positions: &mut Vec<(i32, i32)>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<(QueryResultMetrics, usize)> {
+        if range.is_empty() {
+            return Ok((
+                QueryResultMetrics {
+                    count: 0,
+                    candidate_count: 0,
+                },
+                range.start,
+            ));
+        }
+        let index = &self.index.as_ref();
+
+        let empty_rect = Rect::new(
+            coord!(x: f32::NAN, y: f32::NAN),
+            coord!(x: f32::NAN, y: f32::NAN),
+        );
+        let rects: Vec<_> = range
+            .clone()
+            .map(|row_idx| 
evaluated_batch.geom_array.rects()[row_idx].unwrap_or(empty_rect))
+            .collect();
+
+        let (mut gpu_build_indices, mut gpu_probe_indices) =
+            index.probe(rects.as_ref()).map_err(|e| {
+                DataFusionError::Execution(format!("GPU spatial query failed: 
{:?}", e))
+            })?;
+
+        assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+        let candidate_count = gpu_build_indices.len();
+
+        self.refine(
+            evaluated_batch.geom_array.geometry_array(),
+            &self.spatial_predicate,
+            &mut gpu_build_indices,
+            &mut gpu_probe_indices,
+        )?;
+
+        assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+        let total_count = gpu_build_indices.len();
+
+        for (build_idx, probe_idx) in 
gpu_build_indices.iter().zip(gpu_probe_indices.iter()) {
+            let data_id = *build_idx as usize;
+            let (batch_idx, row_idx) = self.data_id_to_batch_pos[data_id];
+            build_batch_positions.push((batch_idx, row_idx));
+            probe_indices.push(range.start as u32 + probe_idx);
+        }
+        Ok((
+            QueryResultMetrics {
+                count: total_count,
+                candidate_count,
+            },
+            range.end,
+        ))
+    }
+    fn need_more_probe_stats(&self) -> bool {
+        false
+    }
+
+    fn merge_probe_stats(&self, stats: GeoStatistics) {
+        let _ = stats;
+    }

Review Comment:
   ```suggestion
       fn merge_probe_stats(&self, _stats: GeoStatistics) {}
   ```



##########
rust/sedona-spatial-join-gpu/src/join_provider.rs:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::GpuSpatialIndexBuilder;
+use arrow_array::ArrayRef;
+use arrow_schema::SchemaRef;
+use datafusion_common::JoinType;
+use datafusion_common::Result;
+use geo_index::rtree::util::f64_box_to_f32;
+use geo_types::{coord, Rect};
+use sedona_common::SpatialJoinOptions;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::executor::IterGeo;
+use sedona_geo_generic_alg::BoundingRect;
+use sedona_schema::datatypes::SedonaType;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::join_provider::SpatialJoinProvider;
+use sedona_spatial_join::operand_evaluator::{
+    EvaluatedGeometryArray, EvaluatedGeometryArrayFactory,
+};
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::Arc;
+use wkb::reader::GeometryType;
+
+#[derive(Debug)]
+pub(crate) struct GpuSpatialJoinProvider;
+
+impl SpatialJoinProvider for GpuSpatialJoinProvider {
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>> {
+        let builder = GpuSpatialIndexBuilder::new(
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+        );
+        Ok(Box::new(builder))
+    }
+
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        GpuSpatialIndexBuilder::estimate_extra_memory_usage(geo_stats, 
spatial_predicate, options)
+    }
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory> {
+        Arc::new(DefaultGeometryArrayFactory)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct DefaultGeometryArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for DefaultGeometryArrayFactory {
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+    ) -> Result<EvaluatedGeometryArray> {
+        let num_rows = geometry_array.len();
+        let mut rect_vec = Vec::with_capacity(num_rows);
+        geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
+            let rect_opt = if let Some(wkb) = &wkb_opt {
+                if let Some(rect) = wkb.bounding_rect() {
+                    let min = rect.min();
+                    let max = rect.max();
+                    // Why conservative bounding boxes prevent false negatives:
+                    // 1. P32 = round_nearest(P64), so P32 is the closest 
possible float to P64.
+                    // 2. Min32 = round_down(Min64), guaranteeing Min32 <= 
Min64.
+                    // 3. Max32 = round_up(Max64), guaranteeing Max32 >= Max64.
+                    // If P64 is inside Box64 (Min64 <= P64 <= Max64), P32 
cannot fall outside Box32.
+                    // If P32 < Min32, it would mean Min32 is closer to P64 
than P32 is, which
+                    // contradicts P32 being the nearest float. Therefore, 
false negatives are impossible.

Review Comment:
   The only thing I am not sure about here is the case where spilling occurs, 
which I think is the only place these rectangles are used except to pass them 
to the SpatialIndex. They're used to split batches up into partitions (where 
the approximation is fine) but I think the bounds of each partition are 
calculated using them as well. No need to solve that here, just pointing it out 
since I recently spent some time with those internals.
   
   You may want to allude to why the false negatives are prevented specifically 
for points with a reference to how they are sent to the GPU (not obvious from 
the current comment).



##########
rust/sedona-common/src/option.rs:
##########
@@ -114,6 +114,9 @@ config_namespace! {
 
         /// Options for debugging or testing spatial join
         pub debug : SpatialJoinDebugOptions, default = 
SpatialJoinDebugOptions::default()
+
+        /// GPU acceleration options
+        pub gpu: GpuOptions, default = GpuOptions::default()

Review Comment:
   These config options can also live in sedona-spatial-join-gpu (see how 
sedona-pointcloud does this to ensure the options are injected at the 
appropriate time).



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs:
##########
@@ -0,0 +1,765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::BooleanBufferBuilder;
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::{DataFusionError, Result};
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{ExecutionMode, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{
+    GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner, 
GpuSpatialRelationPredicate,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndex;
+use sedona_spatial_join::index::QueryResultMetrics;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::SpatialPredicate;
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use wkb::reader::Wkb;
+
+pub struct GPUSpatialIndex {
+    pub(crate) schema: SchemaRef,
+    pub(crate) _options: SpatialJoinOptions,
+    /// GPU spatial index for performing GPU-accelerated filtering
+    pub(crate) index: Arc<GpuSpatialIndex>,
+    /// GPU spatial refiner for performing GPU-accelerated refinement
+    pub(crate) refiner: Arc<GpuSpatialRefiner>,
+    pub(crate) spatial_predicate: SpatialPredicate,
+    /// Indexed batches containing evaluated geometry arrays. It contains the 
original record
+    /// batches and geometry arrays obtained by evaluating the geometry 
expression on the build side.
+    pub(crate) indexed_batches: Vec<EvaluatedBatch>,
+    /// An array for translating data index to geometry batch index and row 
index
+    pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
+    /// Shared bitmap builders for visited left indices, one per batch
+    pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+    /// Counter of running probe-threads, potentially able to update `bitmap`.
+    /// Each time a probe thread finished probing the index, it will decrement 
the counter.
+    /// The last finished probe thread will produce the extra output batches 
for unmatched
+    /// build side when running left-outer joins. See also 
[`report_probe_completed`].
+    pub(crate) probe_threads_counter: AtomicUsize,
+}
+impl GPUSpatialIndex {
+    pub fn empty(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        let gpu_options = GpuSpatialOptions {
+            cuda_use_memory_pool: options.gpu.use_memory_pool,
+            cuda_memory_pool_init_percent: 
options.gpu.memory_pool_init_percentage as i32,
+            concurrency: 1,
+            device_id: options.gpu.device_id as i32,
+            compress_bvh: options.gpu.compress_bvh,
+            pipeline_batches: options.gpu.pipeline_batches as u32,
+        };
+
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index: Arc::new(
+                GpuSpatialIndex::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            refiner: Arc::new(
+                GpuSpatialRefiner::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            indexed_batches: vec![],
+            data_id_to_batch_pos: vec![],
+            visited_build_side: None,
+            probe_threads_counter,
+        })
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(

Review Comment:
   Instead of the `allow()` here, it's usually more readable to just remove the 
`new()` constructor and construct these using the named struct syntax.



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs:
##########
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, 
GpuSpatialRefiner};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+    schema: SchemaRef,
+    spatial_predicate: SpatialPredicate,
+    options: SpatialJoinOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    /// Batches to be indexed
+    indexed_batches: Vec<EvaluatedBatch>,
+    /// Statistics for indexed geometries
+    memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+    pub(crate) fn is_using_gpu(
+        spatial_predicate: &SpatialPredicate,
+        join_opts: &SpatialJoinOptions,
+    ) -> Result<bool> {
+        if join_opts.gpu.enable {
+            if Self::is_spatial_predicate_supported_on_gpu(spatial_predicate) {
+                return Ok(true);
+            } else if join_opts.gpu.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(false);
+            } else {
+                return Err(DataFusionError::Execution("GPU spatial join is 
enabled, but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+        Ok(false)
+    }
+
+    fn is_spatial_predicate_supported_on_gpu(spatial_predicate: 
&SpatialPredicate) -> bool {
+        match spatial_predicate {
+            SpatialPredicate::Relation(rel) => match rel.relation_type {
+                SpatialRelationType::Intersects => true,
+                SpatialRelationType::Contains => true,
+                SpatialRelationType::Within => true,
+                SpatialRelationType::Covers => true,
+                SpatialRelationType::CoveredBy => true,
+                SpatialRelationType::Touches => true,
+                SpatialRelationType::Crosses => false,
+                SpatialRelationType::Overlaps => false,
+                SpatialRelationType::Equals => true,
+            },
+            SpatialPredicate::Distance(_) => false,
+            SpatialPredicate::KNearestNeighbors(_) => false,
+        }
+    }
+    pub fn new(
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Self {
+        Self {
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+            indexed_batches: vec![],
+            memory_used: 0,
+        }
+    }
+
+    fn build_visited_bitmaps(&mut self) -> 
Result<Option<Mutex<Vec<BooleanBufferBuilder>>>> {
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(None);
+        }
+
+        let mut bitmaps = Vec::with_capacity(self.indexed_batches.len());
+        let mut total_buffer_size = 0;
+
+        for batch in &self.indexed_batches {
+            let batch_rows = batch.batch.num_rows();
+            let buffer_size = batch_rows.div_ceil(8);
+            total_buffer_size += buffer_size;
+
+            let mut bitmap = BooleanBufferBuilder::new(batch_rows);
+            bitmap.append_n(batch_rows, false);
+            bitmaps.push(bitmap);
+        }
+
+        self.record_memory_usage(total_buffer_size);
+
+        Ok(Some(Mutex::new(bitmaps)))
+    }
+
+    fn record_memory_usage(&mut self, bytes: usize) {
+        self.memory_used += bytes;
+        self.metrics.build_mem_used.set_max(self.memory_used);
+    }
+    /// Add a geometry batch to be indexed.
+    ///
+    /// This method accumulates geometry batches that will be used to build 
the spatial index.
+    /// Each batch contains processed geometry data along with memory usage 
information.
+    fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+        let in_mem_size = indexed_batch.in_mem_size()?;
+        self.indexed_batches.push(indexed_batch);
+        self.record_memory_usage(in_mem_size);
+        Ok(())
+    }
+
+    pub(crate) fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        _spatial_predicate: &SpatialPredicate,
+        _options: &SpatialJoinOptions,
+    ) -> usize {
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        // Each geometry requires 4 f32 values to store the bounding rectangle 
(min_x, min_y, max_x, max_y)
+        num_geoms * (4 * 4)
+    }
+}
+fn concat_evaluated_batches(batches: &[EvaluatedBatch]) -> 
Result<EvaluatedBatch> {
+    if batches.is_empty() {
+        return sedona_internal_err!("Cannot concatenate empty list of 
EvaluatedBatches");
+    }
+
+    // 1. Concatenate the underlying Arrow RecordBatches
+    let schema = batches[0].schema();
+    let record_batches: Vec<&RecordBatch> = batches.iter().map(|b| 
&b.batch).collect();
+    let concatenated_batch = concat_batches(&schema, record_batches)?;
+
+    // 2. Prepare for Geometry Interleaving
+    // We need to create a list of (batch_index, row_index) for every row in 
order
+    let mut indices = Vec::with_capacity(concatenated_batch.num_rows());
+    for (batch_idx, batch) in batches.iter().enumerate() {
+        for row_idx in 0..batch.num_rows() {
+            indices.push((batch_idx, row_idx));
+        }
+    }
+
+    // 3. Concatenate Geometry Arrays using the interleave method
+    let geom_arrays: Vec<&EvaluatedGeometryArray> = batches.iter().map(|b| 
&b.geom_array).collect();
+
+    let concatenated_geom_array = 
EvaluatedGeometryArray::interleave(&geom_arrays, &indices)?;

Review Comment:
   No need to solve it here, but it might be faster if we provided an 
`EvaluatedGeometryArray::concatenate()`



##########
rust/sedona-spatial-join-gpu/Cargo.toml:
##########
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+[package]
+name = "sedona-spatial-join-gpu"
+version.workspace = true
+license.workspace = true
+keywords.workspace = true
+categories.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[lints.clippy]
+result_large_err = "allow"
+
+[features]
+gpu = ["sedona-libgpuspatial/gpu"]
+
+[dependencies]
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { workspace = true }
+datafusion-common = { workspace = true }

Review Comment:
   Can any of these dependencies be removed?



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs:
##########
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, 
GpuSpatialRefiner};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+    schema: SchemaRef,
+    spatial_predicate: SpatialPredicate,
+    options: SpatialJoinOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    /// Batches to be indexed
+    indexed_batches: Vec<EvaluatedBatch>,
+    /// Statistics for indexed geometries
+    memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+    pub(crate) fn is_using_gpu(
+        spatial_predicate: &SpatialPredicate,
+        join_opts: &SpatialJoinOptions,
+    ) -> Result<bool> {
+        if join_opts.gpu.enable {
+            if Self::is_spatial_predicate_supported_on_gpu(spatial_predicate) {
+                return Ok(true);
+            } else if join_opts.gpu.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(false);
+            } else {
+                return Err(DataFusionError::Execution("GPU spatial join is 
enabled, but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+        Ok(false)
+    }
+
+    fn is_spatial_predicate_supported_on_gpu(spatial_predicate: 
&SpatialPredicate) -> bool {
+        match spatial_predicate {
+            SpatialPredicate::Relation(rel) => match rel.relation_type {
+                SpatialRelationType::Intersects => true,
+                SpatialRelationType::Contains => true,
+                SpatialRelationType::Within => true,
+                SpatialRelationType::Covers => true,
+                SpatialRelationType::CoveredBy => true,
+                SpatialRelationType::Touches => true,
+                SpatialRelationType::Crosses => false,
+                SpatialRelationType::Overlaps => false,
+                SpatialRelationType::Equals => true,
+            },
+            SpatialPredicate::Distance(_) => false,
+            SpatialPredicate::KNearestNeighbors(_) => false,
+        }
+    }
+    pub fn new(
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Self {
+        Self {
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+            indexed_batches: vec![],
+            memory_used: 0,
+        }
+    }
+
+    fn build_visited_bitmaps(&mut self) -> 
Result<Option<Mutex<Vec<BooleanBufferBuilder>>>> {
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(None);
+        }
+
+        let mut bitmaps = Vec::with_capacity(self.indexed_batches.len());
+        let mut total_buffer_size = 0;
+
+        for batch in &self.indexed_batches {
+            let batch_rows = batch.batch.num_rows();
+            let buffer_size = batch_rows.div_ceil(8);
+            total_buffer_size += buffer_size;
+
+            let mut bitmap = BooleanBufferBuilder::new(batch_rows);
+            bitmap.append_n(batch_rows, false);
+            bitmaps.push(bitmap);
+        }
+
+        self.record_memory_usage(total_buffer_size);
+
+        Ok(Some(Mutex::new(bitmaps)))
+    }
+
+    fn record_memory_usage(&mut self, bytes: usize) {
+        self.memory_used += bytes;
+        self.metrics.build_mem_used.set_max(self.memory_used);
+    }
+    /// Add a geometry batch to be indexed.
+    ///
+    /// This method accumulates geometry batches that will be used to build 
the spatial index.
+    /// Each batch contains processed geometry data along with memory usage 
information.
+    fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+        let in_mem_size = indexed_batch.in_mem_size()?;
+        self.indexed_batches.push(indexed_batch);
+        self.record_memory_usage(in_mem_size);
+        Ok(())
+    }
+
+    pub(crate) fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        _spatial_predicate: &SpatialPredicate,
+        _options: &SpatialJoinOptions,
+    ) -> usize {
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        // Each geometry requires 4 f32 values to store the bounding rectangle 
(min_x, min_y, max_x, max_y)
+        num_geoms * (4 * 4)

Review Comment:
   If these are stored on the GPU you might not have to count them (either way 
it may be good to leave a note here about how much memory might be required on 
the GPU given this estimate in addition to the CPU memory for the index + 
refiner if there is any).



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs:
##########
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, 
GpuSpatialRefiner};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+    schema: SchemaRef,
+    spatial_predicate: SpatialPredicate,
+    options: SpatialJoinOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    /// Batches to be indexed
+    indexed_batches: Vec<EvaluatedBatch>,
+    /// Statistics for indexed geometries
+    memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+    pub(crate) fn is_using_gpu(
+        spatial_predicate: &SpatialPredicate,
+        join_opts: &SpatialJoinOptions,
+    ) -> Result<bool> {
+        if join_opts.gpu.enable {
+            if Self::is_spatial_predicate_supported_on_gpu(spatial_predicate) {
+                return Ok(true);
+            } else if join_opts.gpu.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(false);
+            } else {
+                return Err(DataFusionError::Execution("GPU spatial join is 
enabled, but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+        Ok(false)
+    }
+
+    fn is_spatial_predicate_supported_on_gpu(spatial_predicate: 
&SpatialPredicate) -> bool {
+        match spatial_predicate {
+            SpatialPredicate::Relation(rel) => match rel.relation_type {
+                SpatialRelationType::Intersects => true,
+                SpatialRelationType::Contains => true,
+                SpatialRelationType::Within => true,
+                SpatialRelationType::Covers => true,
+                SpatialRelationType::CoveredBy => true,
+                SpatialRelationType::Touches => true,
+                SpatialRelationType::Crosses => false,
+                SpatialRelationType::Overlaps => false,
+                SpatialRelationType::Equals => true,
+            },
+            SpatialPredicate::Distance(_) => false,
+            SpatialPredicate::KNearestNeighbors(_) => false,
+        }
+    }

Review Comment:
   This is different than the list you have in planner.rs...is that intentional?



##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs:
##########
@@ -0,0 +1,765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::BooleanBufferBuilder;
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::{DataFusionError, Result};
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{ExecutionMode, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{
+    GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner, 
GpuSpatialRelationPredicate,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndex;
+use sedona_spatial_join::index::QueryResultMetrics;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::SpatialPredicate;
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use wkb::reader::Wkb;
+
+pub struct GPUSpatialIndex {
+    pub(crate) schema: SchemaRef,
+    pub(crate) _options: SpatialJoinOptions,
+    /// GPU spatial index for performing GPU-accelerated filtering
+    pub(crate) index: Arc<GpuSpatialIndex>,
+    /// GPU spatial refiner for performing GPU-accelerated refinement
+    pub(crate) refiner: Arc<GpuSpatialRefiner>,
+    pub(crate) spatial_predicate: SpatialPredicate,
+    /// Indexed batches containing evaluated geometry arrays. It contains the 
original record
+    /// batches and geometry arrays obtained by evaluating the geometry 
expression on the build side.
+    pub(crate) indexed_batches: Vec<EvaluatedBatch>,
+    /// An array for translating data index to geometry batch index and row 
index
+    pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
+    /// Shared bitmap builders for visited left indices, one per batch
+    pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+    /// Counter of running probe-threads, potentially able to update `bitmap`.
+    /// Each time a probe thread finished probing the index, it will decrement 
the counter.
+    /// The last finished probe thread will produce the extra output batches 
for unmatched
+    /// build side when running left-outer joins. See also 
[`report_probe_completed`].
+    pub(crate) probe_threads_counter: AtomicUsize,
+}
+impl GPUSpatialIndex {
+    pub fn empty(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        let gpu_options = GpuSpatialOptions {
+            cuda_use_memory_pool: options.gpu.use_memory_pool,
+            cuda_memory_pool_init_percent: 
options.gpu.memory_pool_init_percentage as i32,
+            concurrency: 1,
+            device_id: options.gpu.device_id as i32,
+            compress_bvh: options.gpu.compress_bvh,
+            pipeline_batches: options.gpu.pipeline_batches as u32,
+        };
+
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index: Arc::new(
+                GpuSpatialIndex::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            refiner: Arc::new(
+                GpuSpatialRefiner::try_new(&gpu_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            indexed_batches: vec![],
+            data_id_to_batch_pos: vec![],
+            visited_build_side: None,
+            probe_threads_counter,
+        })
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        options: SpatialJoinOptions,
+        index: Arc<GpuSpatialIndex>,
+        refiner: Arc<GpuSpatialRefiner>,
+        indexed_batches: Vec<EvaluatedBatch>,
+        data_id_to_batch_pos: Vec<(i32, i32)>,
+        visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        Ok(Self {
+            schema,
+            _options: options,
+            spatial_predicate,
+            index,
+            refiner,
+            indexed_batches,
+            data_id_to_batch_pos,
+            visited_build_side,
+            probe_threads_counter,
+        })
+    }
+
+    fn refine(
+        &self,
+        probe_geoms: &ArrayRef,
+        predicate: &SpatialPredicate,
+        build_indices: &mut Vec<u32>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<()> {
+        match predicate {
+            SpatialPredicate::Relation(rel_p) => {
+                self.refiner
+                    .refine(
+                        probe_geoms,
+                        Self::convert_relation_type(&rel_p.relation_type)?,
+                        build_indices,
+                        probe_indices,
+                    )
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "GPU spatial refinement failed: {:?}",
+                            e
+                        ))
+                    })?;
+                Ok(())
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Only Relation predicate is supported for GPU spatial 
query".to_string(),
+            )),
+        }
+    }
+    // Translate Sedona SpatialRelationType to GpuSpatialRelationPredicate
+    fn convert_relation_type(t: &SpatialRelationType) -> 
Result<GpuSpatialRelationPredicate> {
+        match t {
+            SpatialRelationType::Equals => 
Ok(GpuSpatialRelationPredicate::Equals),
+            SpatialRelationType::Touches => 
Ok(GpuSpatialRelationPredicate::Touches),
+            SpatialRelationType::Contains => 
Ok(GpuSpatialRelationPredicate::Contains),
+            SpatialRelationType::Covers => 
Ok(GpuSpatialRelationPredicate::Covers),
+            SpatialRelationType::Intersects => 
Ok(GpuSpatialRelationPredicate::Intersects),
+            SpatialRelationType::Within => 
Ok(GpuSpatialRelationPredicate::Within),
+            SpatialRelationType::CoveredBy => 
Ok(GpuSpatialRelationPredicate::CoveredBy),
+            _ => {
+                // This should not happen as we check for supported predicates 
earlier
+                Err(DataFusionError::Execution(format!(
+                    "Unsupported spatial relation type for GPU: {:?}",
+                    t
+                )))
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl SpatialIndex for GPUSpatialIndex {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    #[allow(dead_code)] // This is used for tests

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to