pwrliang commented on code in PR #722:
URL: https://github.com/apache/sedona-db/pull/722#discussion_r3083703441


##########
rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs:
##########
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, 
GpuSpatialRefiner};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+    schema: SchemaRef,
+    spatial_predicate: SpatialPredicate,
+    options: SpatialJoinOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    /// Batches to be indexed
+    indexed_batches: Vec<EvaluatedBatch>,
+    /// Statistics for indexed geometries
+    memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+    pub(crate) fn is_using_gpu(
+        spatial_predicate: &SpatialPredicate,
+        join_opts: &SpatialJoinOptions,
+    ) -> Result<bool> {
+        if join_opts.gpu.enable {
+            if Self::is_spatial_predicate_supported_on_gpu(spatial_predicate) {
+                return Ok(true);
+            } else if join_opts.gpu.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(false);
+            } else {
+                return Err(DataFusionError::Execution("GPU spatial join is 
enabled, but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+        Ok(false)
+    }
+
+    fn is_spatial_predicate_supported_on_gpu(spatial_predicate: 
&SpatialPredicate) -> bool {
+        match spatial_predicate {
+            SpatialPredicate::Relation(rel) => match rel.relation_type {
+                SpatialRelationType::Intersects => true,
+                SpatialRelationType::Contains => true,
+                SpatialRelationType::Within => true,
+                SpatialRelationType::Covers => true,
+                SpatialRelationType::CoveredBy => true,
+                SpatialRelationType::Touches => true,
+                SpatialRelationType::Crosses => false,
+                SpatialRelationType::Overlaps => false,
+                SpatialRelationType::Equals => true,
+            },
+            SpatialPredicate::Distance(_) => false,
+            SpatialPredicate::KNearestNeighbors(_) => false,
+        }
+    }
+    pub fn new(
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Self {
+        Self {
+            schema,
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+            indexed_batches: vec![],
+            memory_used: 0,
+        }
+    }
+
+    fn build_visited_bitmaps(&mut self) -> 
Result<Option<Mutex<Vec<BooleanBufferBuilder>>>> {
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(None);
+        }
+
+        let mut bitmaps = Vec::with_capacity(self.indexed_batches.len());
+        let mut total_buffer_size = 0;
+
+        for batch in &self.indexed_batches {
+            let batch_rows = batch.batch.num_rows();
+            let buffer_size = batch_rows.div_ceil(8);
+            total_buffer_size += buffer_size;
+
+            let mut bitmap = BooleanBufferBuilder::new(batch_rows);
+            bitmap.append_n(batch_rows, false);
+            bitmaps.push(bitmap);
+        }
+
+        self.record_memory_usage(total_buffer_size);
+
+        Ok(Some(Mutex::new(bitmaps)))
+    }
+
+    fn record_memory_usage(&mut self, bytes: usize) {
+        self.memory_used += bytes;
+        self.metrics.build_mem_used.set_max(self.memory_used);
+    }
+    /// Add a geometry batch to be indexed.
+    ///
+    /// This method accumulates geometry batches that will be used to build 
the spatial index.
+    /// Each batch contains processed geometry data along with memory usage 
information.
+    fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+        let in_mem_size = indexed_batch.in_mem_size()?;
+        self.indexed_batches.push(indexed_batch);
+        self.record_memory_usage(in_mem_size);
+        Ok(())
+    }
+
+    pub(crate) fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        _spatial_predicate: &SpatialPredicate,
+        _options: &SpatialJoinOptions,
+    ) -> usize {
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        // Each geometry requires 4 f32 values to store the bounding rectangle 
(min_x, min_y, max_x, max_y)
+        num_geoms * (4 * 4)
+    }
+}
+fn concat_evaluated_batches(batches: &[EvaluatedBatch]) -> 
Result<EvaluatedBatch> {
+    if batches.is_empty() {
+        return sedona_internal_err!("Cannot concatenate empty list of 
EvaluatedBatches");
+    }
+
+    // 1. Concatenate the underlying Arrow RecordBatches
+    let schema = batches[0].schema();
+    let record_batches: Vec<&RecordBatch> = batches.iter().map(|b| 
&b.batch).collect();
+    let concatenated_batch = concat_batches(&schema, record_batches)?;
+
+    // 2. Prepare for Geometry Interleaving
+    // We need to create a list of (batch_index, row_index) for every row in 
order
+    let mut indices = Vec::with_capacity(concatenated_batch.num_rows());
+    for (batch_idx, batch) in batches.iter().enumerate() {
+        for row_idx in 0..batch.num_rows() {
+            indices.push((batch_idx, row_idx));
+        }
+    }
+
+    // 3. Concatenate Geometry Arrays using the interleave method
+    let geom_arrays: Vec<&EvaluatedGeometryArray> = batches.iter().map(|b| 
&b.geom_array).collect();
+
+    let concatenated_geom_array = 
EvaluatedGeometryArray::interleave(&geom_arrays, &indices)?;

Review Comment:
   https://github.com/apache/sedona-db/issues/763



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to