zhangfengcdt commented on code in PR #775:
URL: https://github.com/apache/sedona-db/pull/775#discussion_r3138556561


##########
rust/sedona-spatial-join-geography/src/join_provider.rs:
##########
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, Float64Array};
+use datafusion_common::{exec_datafusion_err, JoinType, Result, ScalarValue};
+use datafusion_physical_plan::ColumnarValue;
+use geo_index::rtree::util::f64_box_to_f32;
+use geo_types::{coord, Rect};
+use sedona_common::sedona_internal_err;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::executor::IterGeo;
+use sedona_s2geography::{
+    geography::{Geography, GeographyFactory},
+    rect_bounder::RectBounder,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_spatial_join::{
+    index::{
+        default_spatial_index_builder::DefaultSpatialIndexBuilder,
+        spatial_index_builder::SpatialJoinBuildMetrics, SpatialIndexBuilder,
+    },
+    join_provider::SpatialJoinProvider,
+    operand_evaluator::{EvaluatedGeometryArray, EvaluatedGeometryArrayFactory},
+    SpatialJoinOptions, SpatialPredicate,
+};
+
+use crate::{
+    refiner::GeographyRefinerFactory, 
spatial_index_builder::GeographySpatialIndexBuilder,
+};
+
+#[derive(Debug)]
+pub(crate) struct GeographyJoinProvider;
+
+impl SpatialJoinProvider for GeographyJoinProvider {
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: arrow_schema::SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>> {
+        // Create the inner (default) builder
+        let builder = GeographySpatialIndexBuilder::new(
+            schema,
+            spatial_predicate.clone(),
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+        )?;
+
+        Ok(Box::new(builder))
+    }
+
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        DefaultSpatialIndexBuilder::estimate_extra_memory_usage(
+            geo_stats,
+            spatial_predicate,
+            options,
+            Arc::new(GeographyRefinerFactory),
+        )
+    }
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory> {
+        Arc::new(GeographyEvaluatedArrayFactory)
+    }
+}
+
+#[derive(Debug)]
+struct GeographyEvaluatedArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for GeographyEvaluatedArrayFactory {
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+        distance_columnar_value: Option<&ColumnarValue>,
+    ) -> Result<EvaluatedGeometryArray> {
+        // Without distance expansion use the impl without a bounder modifier
+        let Some(distance_columnar_value) = distance_columnar_value else {
+            return try_new_evaluated_array_impl(geometry_array, sedona_type, 
|_bounder| {});
+        };
+
+        let result = match distance_columnar_value {
+            ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => {
+                try_new_evaluated_array_impl(geometry_array, sedona_type, 
|bounder| {
+                    bounder.expand_by_distance(*distance);
+                })
+            }
+            ColumnarValue::Scalar(ScalarValue::Float64(None)) => {
+                try_new_evaluated_array_impl(geometry_array, sedona_type, 
|bounder| {
+                    bounder.clear();
+                })
+            }
+            ColumnarValue::Array(array) => {
+                if let Some(array) = 
array.as_any().downcast_ref::<Float64Array>() {
+                    let mut distance_iter = array.iter();
+                    try_new_evaluated_array_impl(geometry_array, sedona_type, 
|bounder| {
+                        if let Some(next_distance) = distance_iter
+                            .next()
+                            .expect("distance array has correct length")
+                        {
+                            // Non null distance
+                            bounder.expand_by_distance(next_distance);
+                        } else {
+                            // Null distance
+                            bounder.clear();
+                        }
+                    })
+                } else {
+                    return sedona_internal_err!("Distance columnar value is 
not a Float64Array");
+                }
+            }
+            _ => {
+                return sedona_internal_err!("Distance columnar value is not a 
Float64");
+            }
+        }?;
+
+        // Store the distance value so it can be used during refinement
+        Ok(result.with_distance(Some(distance_columnar_value.clone())))
+    }
+}
+
+fn try_new_evaluated_array_impl(
+    geometry_array: ArrayRef,
+    sedona_type: &SedonaType,
+    mut modify_bounder: impl FnMut(&mut RectBounder),
+) -> Result<EvaluatedGeometryArray> {
+    // Compute rectangles from wkb using the RectBounder from s2geography
+    let num_rows = geometry_array.len();
+    let mut bounder = RectBounder::new();
+    let mut factory = GeographyFactory::new();
+    let mut geog = Geography::new();
+    let mut rect_vec = Vec::with_capacity(num_rows);
+
+    geometry_array.iter_as_wkb_bytes(sedona_type, num_rows, |wkb_opt| {
+        if let Some(wkb) = wkb_opt {
+            bounder.clear();
+            factory.init_from_wkb(wkb, &mut geog).map_err(|e| {
+                exec_datafusion_err!("Failed to read WKB in evaluated array 
factory: {e}")
+            })?;
+            bounder.bound(&geog).map_err(|e| {
+                exec_datafusion_err!("Failed to bound geography in evaluated 
array factory: {e}")
+            })?;
+
+            // Account for distance (either by expanding the bounds or by 
emptying them for a null
+            // distance).
+            modify_bounder(&mut bounder);
+
+            let maybe_bounds = bounder.finish().map_err(|e| {
+                exec_datafusion_err!("Failed to finish bounding in evaluated 
array factory: {e}")
+            })?;
+            if let Some((mut min_x, min_y, mut max_x, max_y)) = maybe_bounds {
+                // The evaluated geometry array currently needs Cartesian 
rectangles; however
+                // we can still recalculate these when we ingest into the 
index. In the
+                // partitioned join we may want to ensure we can express 
bounds in a way that
+                // the partitioner understands (if it doesn't already) to do a 
better job
+                // partitioning wraparounds.
+                if min_x > max_x {

Review Comment:
   This could make the shape that across antimeridian return a full longitude 
range, right? Is this because of the limitation of r-tree not being able to 
process cross-antimeridian geog correctly?



##########
rust/sedona-spatial-join-geography/src/refiner.rs:
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Geography-specific refiner for spatial join predicate evaluation using 
s2geography.
+//!
+//! This module provides a refiner that evaluates spatial predicates on the 
sphere
+//! using s2geography, rather than the default Cartesian predicates.
+
+use std::sync::{
+    atomic::{AtomicUsize, Ordering},
+    Arc,
+};
+
+use datafusion_common::{exec_datafusion_err, Result};
+use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_s2geography::{
+    geography::{Geography, GeographyFactory},
+    operator::{Op, OpType},
+};
+use sedona_spatial_join::{
+    refine::IndexQueryResultRefinerFactory,
+    spatial_predicate::{RelationPredicate, SpatialRelationType},
+    utils::init_once_array::InitOnceArray,
+    IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate,
+};
+use wkb::reader::Wkb;
+
+#[derive(Debug)]
+pub(crate) struct GeographyRefiner {
+    op_type: OpType,
+    prepared_geoms: InitOnceArray<Option<Geography<'static>>>,
+    mem_used: AtomicUsize,
+}
+
+impl GeographyRefiner {
+    pub fn new(
+        predicate: SpatialPredicate,
+        options: &SpatialJoinOptions,
+        num_build_geoms: usize,
+    ) -> Result<Self> {
+        let op_type = match predicate {
+            SpatialPredicate::Relation(RelationPredicate {
+                left: _,
+                right: _,
+                relation_type,
+            }) => match relation_type {
+                SpatialRelationType::Intersects => OpType::Intersects,
+                SpatialRelationType::Contains => OpType::Contains,
+                SpatialRelationType::Within => OpType::Within,
+                SpatialRelationType::Equals => OpType::Equals,
+                _ => {
+                    return sedona_internal_err!(
+                        "GeographyRefiner created with unsupported relation 
type {relation_type}"
+                    )
+                }
+            },
+            SpatialPredicate::Distance(_) => OpType::DWithin,
+            _ => {
+                return sedona_internal_err!(
+                    "GeographyRefiner created with unsupported predicate 
{predicate}"
+                )
+            }
+        };
+
+        // Allow join options to turn off preparedness
+        let prepared_geoms = if !matches!(
+            options.execution_mode,
+            ExecutionMode::PrepareNone | ExecutionMode::PrepareProbe
+        ) {
+            InitOnceArray::new(num_build_geoms)
+        } else {
+            InitOnceArray::new(0)
+        };
+
+        let mem_used = AtomicUsize::new(prepared_geoms.allocated_size());
+
+        Ok(Self {
+            op_type,
+            prepared_geoms,
+            mem_used,
+        })
+    }
+}
+
+impl IndexQueryResultRefiner for GeographyRefiner {
+    fn refine(
+        &self,
+        probe: &Wkb<'_>,
+        index_query_results: &[IndexQueryResult],
+    ) -> Result<Vec<(i32, i32)>> {
+        let mut results = Vec::with_capacity(index_query_results.len());
+        let mut op = Op::new(self.op_type);
+
+        // We may want thread local factories/build_geog/probe_geog here
+        let mut factory = GeographyFactory::new();
+        let mut probe_geog = factory
+            .from_wkb(probe.buf())
+            .map_err(|e| exec_datafusion_err!("{e}"))?;
+
+        let mut build_geog = Geography::new();
+
+        // Crude heuristic used by the S2Loop (build an index after 20 
unindexed
+        // contains queries even for small loops).
+        if probe.buf().len() > (32 * 2 * size_of::<f64>()) || 
index_query_results.len() > 20 {
+            probe_geog
+                .prepare()
+                .map_err(|e| exec_datafusion_err!("{e}"))?;
+        }
+
+        if !self.prepared_geoms.is_empty() {
+            // We're in prepared build mode
+            for result in index_query_results {
+                let (prepared_build_geom, is_newly_inserted) =
+                    self.prepared_geoms.get_or_create(result.geom_idx, || {
+                        // Basically, prepare anything except points on the 
build side
+                        if result.wkb.buf().len() > 32 {
+                            let mut geog = factory
+                                .from_wkb(result.wkb.buf())
+                                .map_err(|e| exec_datafusion_err!("{e}"))?;
+                            geog.prepare().map_err(|e| 
exec_datafusion_err!("{e}"))?;
+                            Ok(Some(unsafe {
+                                // Safety: the evaluated batches keep the 
required WKB alive. The
+                                // refiner always outlives the evaluated 
batches.
+                                std::mem::transmute::<Geography<'_>, 
Geography<'static>>(geog)
+                            }))
+                        } else {
+                            Ok(None)
+                        }
+                    })?;
+
+                let build_geog_ref = if let Some(prepared_geog) = 
prepared_build_geom {
+                    if is_newly_inserted {
+                        self.mem_used
+                            .fetch_add(prepared_geog.mem_used(), 
Ordering::Relaxed);
+                    }
+
+                    prepared_geog
+                } else {
+                    factory
+                        .init_from_wkb(result.wkb.buf(), &mut build_geog)
+                        .map_err(|e| exec_datafusion_err!("{e}"))?;
+                    &build_geog
+                };
+
+                let eval = if matches!(self.op_type, OpType::DWithin) {
+                    if let Some(distance) = result.distance {
+                        op.eval_binary_distance_predicate(build_geog_ref, 
&probe_geog, distance)
+                    } else {
+                        Ok(false)
+                    }
+                } else {
+                    op.eval_binary_predicate(build_geog_ref, &probe_geog)
+                };
+
+                if eval.map_err(|e| exec_datafusion_err!("{e}"))? {
+                    results.push(result.position);
+                }
+            }
+        } else {
+            // We are not in prepared build mode
+            for result in index_query_results {
+                factory
+                    .init_from_wkb(result.wkb.buf(), &mut build_geog)
+                    .map_err(|e| exec_datafusion_err!("{e}"))?;
+
+                let eval = if matches!(self.op_type, OpType::DWithin) {
+                    if let Some(distance) = result.distance {
+                        op.eval_binary_distance_predicate(&build_geog, 
&probe_geog, distance)
+                    } else {
+                        Ok(false)
+                    }
+                } else {
+                    op.eval_binary_predicate(&build_geog, &probe_geog)
+                };
+
+                if eval.map_err(|e| exec_datafusion_err!("{e}"))? {
+                    results.push(result.position);
+                }
+            }
+        }
+
+        Ok(results)
+    }
+
+    fn estimate_max_memory_usage(&self, build_stats: &GeoStatistics) -> usize {
+        // This estimate could be improved but is a crude heuristic based on:
+        //
+        // - In prepared build mode, we allocate the prepared geometry array
+        // - Points are never prepared and don't occupy any extra memory
+        // - The size of a prepared geometry once indexed is usually >500 bytes
+        //   but is usually larger based on the number of coordinates. This 
could
+        //   use a better estimate based on the mean byte size.
+        //
+        // Note this is called after the object has already been created but 
before
+        // the index has ever been probed. It should be an error to have 
build_stats
+        // where some components are None.
+
+        // If we never prepare anything, our memory usage is negligible
+        if self.prepared_geoms.is_empty() {
+            return 0;
+        }
+
+        let num_polygons = build_stats.polygonal_count().unwrap_or(0);
+        let num_lines = build_stats.lineal_count().unwrap_or(0);
+        (num_polygons + num_lines).try_into().unwrap_or(usize::MAX) * 500

Review Comment:
   Should it also count the number of vertex in the estimation as well? 



##########
rust/sedona-spatial-join-geography/src/join_provider.rs:
##########
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, Float64Array};
+use datafusion_common::{exec_datafusion_err, JoinType, Result, ScalarValue};
+use datafusion_physical_plan::ColumnarValue;
+use geo_index::rtree::util::f64_box_to_f32;
+use geo_types::{coord, Rect};
+use sedona_common::sedona_internal_err;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::executor::IterGeo;
+use sedona_s2geography::{
+    geography::{Geography, GeographyFactory},
+    rect_bounder::RectBounder,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_spatial_join::{
+    index::{
+        default_spatial_index_builder::DefaultSpatialIndexBuilder,
+        spatial_index_builder::SpatialJoinBuildMetrics, SpatialIndexBuilder,
+    },
+    join_provider::SpatialJoinProvider,
+    operand_evaluator::{EvaluatedGeometryArray, EvaluatedGeometryArrayFactory},
+    SpatialJoinOptions, SpatialPredicate,
+};
+
+use crate::{
+    refiner::GeographyRefinerFactory, 
spatial_index_builder::GeographySpatialIndexBuilder,
+};
+
+#[derive(Debug)]
+pub(crate) struct GeographyJoinProvider;
+
+impl SpatialJoinProvider for GeographyJoinProvider {
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: arrow_schema::SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>> {
+        // Create the inner (default) builder
+        let builder = GeographySpatialIndexBuilder::new(
+            schema,
+            spatial_predicate.clone(),
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+        )?;
+
+        Ok(Box::new(builder))
+    }
+
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        DefaultSpatialIndexBuilder::estimate_extra_memory_usage(
+            geo_stats,
+            spatial_predicate,
+            options,
+            Arc::new(GeographyRefinerFactory),
+        )
+    }
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory> {
+        Arc::new(GeographyEvaluatedArrayFactory)
+    }
+}
+
+#[derive(Debug)]
+struct GeographyEvaluatedArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for GeographyEvaluatedArrayFactory {
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+        distance_columnar_value: Option<&ColumnarValue>,
+    ) -> Result<EvaluatedGeometryArray> {
+        // Without distance expansion use the impl without a bounder modifier
+        let Some(distance_columnar_value) = distance_columnar_value else {
+            return try_new_evaluated_array_impl(geometry_array, sedona_type, 
|_bounder| {});
+        };
+
+        let result = match distance_columnar_value {
+            ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => {

Review Comment:
   Do we need to match Int64 type as well? for example, ST_DWithin(geo, geo, 
10)? Would be nice to add this type of test (instead of 10.0) as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to