Copilot commented on code in PR #775: URL: https://github.com/apache/sedona-db/pull/775#discussion_r3133273728
########## rust/sedona-spatial-join-geography/src/refiner.rs: ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Geography-specific refiner for spatial join predicate evaluation using s2geography. +//! +//! This module provides a refiner that evaluates spatial predicates on the sphere +//! using s2geography, rather than the default Cartesian predicates. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use datafusion_common::{exec_datafusion_err, Result}; +use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + operator::{Op, OpType}, +}; +use sedona_spatial_join::{ + refine::IndexQueryResultRefinerFactory, + spatial_predicate::{RelationPredicate, SpatialRelationType}, + utils::init_once_array::InitOnceArray, + IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate, +}; +use wkb::reader::Wkb; + +#[derive(Debug)] +pub(crate) struct GeographyRefiner { + op_type: OpType, + prepared_geoms: InitOnceArray<Option<Geography<'static>>>, + mem_used: AtomicUsize, +} + +impl GeographyRefiner { + pub fn new( + predicate: SpatialPredicate, + options: &SpatialJoinOptions, + num_build_geoms: usize, + ) -> Result<Self> { + let op_type = match predicate { + SpatialPredicate::Relation(RelationPredicate { + left: _, + right: _, + relation_type, + }) => match relation_type { + SpatialRelationType::Intersects => OpType::Intersects, + SpatialRelationType::Contains => OpType::Contains, + SpatialRelationType::Within => OpType::Within, + SpatialRelationType::Equals => OpType::Equals, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported relation type {relation_type}" + ) + } + }, + SpatialPredicate::Distance(_) => OpType::DWithin, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported predicate {predicate}" + ) + } + }; + + // If we Review Comment: Incomplete comment (`// If we`)—please either finish the thought or remove it to avoid confusion. ```suggestion ``` ########## rust/sedona-spatial-join-geography/src/join_provider.rs: ########## @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, Float64Array}; +use datafusion_common::{exec_datafusion_err, JoinType, Result, ScalarValue}; +use datafusion_physical_plan::ColumnarValue; +use geo_index::rtree::util::f64_box_to_f32; +use geo_types::{coord, Rect}; +use sedona_common::sedona_internal_err; +use sedona_expr::statistics::GeoStatistics; +use sedona_functions::executor::IterGeo; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + rect_bounder::RectBounder, +}; +use sedona_schema::datatypes::SedonaType; +use sedona_spatial_join::{ + index::{spatial_index_builder::SpatialJoinBuildMetrics, SpatialIndexBuilder}, + join_provider::SpatialJoinProvider, + operand_evaluator::{EvaluatedGeometryArray, EvaluatedGeometryArrayFactory}, + SpatialJoinOptions, SpatialPredicate, +}; + +use crate::spatial_index_builder::GeographySpatialIndexBuilder; + +#[derive(Debug)] +pub(crate) struct GeographyJoinProvider; + +impl SpatialJoinProvider for GeographyJoinProvider { + fn try_new_spatial_index_builder( + &self, + schema: arrow_schema::SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + ) -> Result<Box<dyn SpatialIndexBuilder>> { + // Create the inner (default) builder + let builder = GeographySpatialIndexBuilder::new( + schema, + spatial_predicate.clone(), + options, + join_type, + probe_threads_count, + metrics, + )?; + + Ok(Box::new(builder)) + } + + fn estimate_extra_memory_usage( + &self, + _geo_stats: &GeoStatistics, + _spatial_predicate: &SpatialPredicate, + _options: &SpatialJoinOptions, + ) -> usize { + // TODO: calculate + 0 + } + + fn evaluated_array_factory(&self) -> Arc<dyn EvaluatedGeometryArrayFactory> { + Arc::new(GeographyEvaluatedArrayFactory) + } +} + +#[derive(Debug)] +struct GeographyEvaluatedArrayFactory; + +impl EvaluatedGeometryArrayFactory for GeographyEvaluatedArrayFactory { + fn try_new_evaluated_array( + &self, + geometry_array: ArrayRef, + sedona_type: &SedonaType, + distance_columnar_value: Option<&ColumnarValue>, + ) -> Result<EvaluatedGeometryArray> { + // Without distance expansion use the impl without a bounder modifier + let Some(distance_columnar_value) = distance_columnar_value else { + return try_new_evaluated_array_impl(geometry_array, sedona_type, |_bounder| {}); + }; + + let result = match distance_columnar_value { + ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => { + try_new_evaluated_array_impl(geometry_array, sedona_type, |bounder| { + bounder.expand_by_distance(*distance); + }) + } + ColumnarValue::Scalar(ScalarValue::Float64(None)) => { + todo!() Review Comment: `todo!()` will panic when the distance expression evaluates to NULL (e.g., ST_DWithin(..., NULL)). This should behave like the default implementation (produce NULL/empty rectangles so the join condition is not satisfied) rather than crashing. ```suggestion try_new_evaluated_array_impl(geometry_array, sedona_type, |bounder| { bounder.clear(); }) ``` ########## rust/sedona-spatial-join-geography/src/refiner.rs: ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Geography-specific refiner for spatial join predicate evaluation using s2geography. +//! +//! This module provides a refiner that evaluates spatial predicates on the sphere +//! using s2geography, rather than the default Cartesian predicates. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use datafusion_common::{exec_datafusion_err, Result}; +use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + operator::{Op, OpType}, +}; +use sedona_spatial_join::{ + refine::IndexQueryResultRefinerFactory, + spatial_predicate::{RelationPredicate, SpatialRelationType}, + utils::init_once_array::InitOnceArray, + IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate, +}; +use wkb::reader::Wkb; + +#[derive(Debug)] +pub(crate) struct GeographyRefiner { + op_type: OpType, + prepared_geoms: InitOnceArray<Option<Geography<'static>>>, + mem_used: AtomicUsize, +} + +impl GeographyRefiner { + pub fn new( + predicate: SpatialPredicate, + options: &SpatialJoinOptions, + num_build_geoms: usize, + ) -> Result<Self> { + let op_type = match predicate { + SpatialPredicate::Relation(RelationPredicate { + left: _, + right: _, + relation_type, + }) => match relation_type { + SpatialRelationType::Intersects => OpType::Intersects, + SpatialRelationType::Contains => OpType::Contains, + SpatialRelationType::Within => OpType::Within, + SpatialRelationType::Equals => OpType::Equals, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported relation type {relation_type}" + ) + } + }, + SpatialPredicate::Distance(_) => OpType::DWithin, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported predicate {predicate}" + ) + } + }; + + // If we + let prepared_geoms = if matches!( + options.execution_mode, + ExecutionMode::PrepareNone | ExecutionMode::PrepareProbe + ) { Review Comment: ExecutionMode handling looks inverted: this allocates `prepared_geoms` when execution_mode is `PrepareNone`/`PrepareProbe` and disables preparation for `PrepareBuild`. As a result, `actual_execution_mode()` will also report the opposite of what was requested. Allocate `prepared_geoms` only when the selected mode is `PrepareBuild` (and decide what to do for `Speculative`). ```suggestion let prepared_geoms = if matches!(options.execution_mode, ExecutionMode::PrepareBuild) { ``` ########## rust/sedona-spatial-join-geography/src/refiner.rs: ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Geography-specific refiner for spatial join predicate evaluation using s2geography. +//! +//! This module provides a refiner that evaluates spatial predicates on the sphere +//! using s2geography, rather than the default Cartesian predicates. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use datafusion_common::{exec_datafusion_err, Result}; +use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + operator::{Op, OpType}, +}; +use sedona_spatial_join::{ + refine::IndexQueryResultRefinerFactory, + spatial_predicate::{RelationPredicate, SpatialRelationType}, + utils::init_once_array::InitOnceArray, + IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate, +}; +use wkb::reader::Wkb; + +#[derive(Debug)] +pub(crate) struct GeographyRefiner { + op_type: OpType, + prepared_geoms: InitOnceArray<Option<Geography<'static>>>, + mem_used: AtomicUsize, +} + +impl GeographyRefiner { + pub fn new( + predicate: SpatialPredicate, + options: &SpatialJoinOptions, + num_build_geoms: usize, + ) -> Result<Self> { + let op_type = match predicate { + SpatialPredicate::Relation(RelationPredicate { + left: _, + right: _, + relation_type, + }) => match relation_type { + SpatialRelationType::Intersects => OpType::Intersects, + SpatialRelationType::Contains => OpType::Contains, + SpatialRelationType::Within => OpType::Within, + SpatialRelationType::Equals => OpType::Equals, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported relation type {relation_type}" + ) + } + }, + SpatialPredicate::Distance(_) => OpType::DWithin, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported predicate {predicate}" + ) + } + }; + + // If we + let prepared_geoms = if matches!( + options.execution_mode, + ExecutionMode::PrepareNone | ExecutionMode::PrepareProbe + ) { + InitOnceArray::new(num_build_geoms) + } else { + InitOnceArray::new(0) + }; + + let mem_used = AtomicUsize::new(prepared_geoms.allocated_size()); + + Ok(Self { + op_type, + prepared_geoms, + mem_used, + }) + } +} + +impl IndexQueryResultRefiner for GeographyRefiner { + fn refine( + &self, + probe: &Wkb<'_>, + index_query_results: &[IndexQueryResult], + ) -> Result<Vec<(i32, i32)>> { + let mut results = Vec::with_capacity(index_query_results.len()); + let mut op = Op::new(self.op_type); + + // We may want thread local factories/build_geog/probe_geog here + let mut factory = GeographyFactory::new(); + let mut probe_geog = factory + .from_wkb(probe.buf()) + .map_err(|e| exec_datafusion_err!("{e}"))?; + + let mut build_geog = Geography::new(); + + // Crude heuristic used by the S2Loop (build an index after 20 unindexed + // contains queries even for small looops). + if probe.buf().len() > (32 * 2 * size_of::<f64>()) || index_query_results.len() > 20 { Review Comment: This uses `size_of::<f64>()` without importing it or qualifying it as `std::mem::size_of`, which will not compile. ```suggestion if probe.buf().len() > (32 * 2 * std::mem::size_of::<f64>()) || index_query_results.len() > 20 { ``` ########## rust/sedona-spatial-join-geography/src/refiner.rs: ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Geography-specific refiner for spatial join predicate evaluation using s2geography. +//! +//! This module provides a refiner that evaluates spatial predicates on the sphere +//! using s2geography, rather than the default Cartesian predicates. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use datafusion_common::{exec_datafusion_err, Result}; +use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + operator::{Op, OpType}, +}; +use sedona_spatial_join::{ + refine::IndexQueryResultRefinerFactory, + spatial_predicate::{RelationPredicate, SpatialRelationType}, + utils::init_once_array::InitOnceArray, + IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate, +}; +use wkb::reader::Wkb; + +#[derive(Debug)] +pub(crate) struct GeographyRefiner { + op_type: OpType, + prepared_geoms: InitOnceArray<Option<Geography<'static>>>, + mem_used: AtomicUsize, +} + +impl GeographyRefiner { + pub fn new( + predicate: SpatialPredicate, + options: &SpatialJoinOptions, + num_build_geoms: usize, + ) -> Result<Self> { + let op_type = match predicate { + SpatialPredicate::Relation(RelationPredicate { + left: _, + right: _, + relation_type, + }) => match relation_type { + SpatialRelationType::Intersects => OpType::Intersects, + SpatialRelationType::Contains => OpType::Contains, + SpatialRelationType::Within => OpType::Within, + SpatialRelationType::Equals => OpType::Equals, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported relation type {relation_type}" + ) + } + }, + SpatialPredicate::Distance(_) => OpType::DWithin, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported predicate {predicate}" + ) + } + }; + + // If we + let prepared_geoms = if matches!( + options.execution_mode, + ExecutionMode::PrepareNone | ExecutionMode::PrepareProbe + ) { + InitOnceArray::new(num_build_geoms) + } else { + InitOnceArray::new(0) + }; + + let mem_used = AtomicUsize::new(prepared_geoms.allocated_size()); + + Ok(Self { + op_type, + prepared_geoms, + mem_used, + }) + } +} + +impl IndexQueryResultRefiner for GeographyRefiner { + fn refine( + &self, + probe: &Wkb<'_>, + index_query_results: &[IndexQueryResult], + ) -> Result<Vec<(i32, i32)>> { + let mut results = Vec::with_capacity(index_query_results.len()); + let mut op = Op::new(self.op_type); + + // We may want thread local factories/build_geog/probe_geog here + let mut factory = GeographyFactory::new(); + let mut probe_geog = factory + .from_wkb(probe.buf()) + .map_err(|e| exec_datafusion_err!("{e}"))?; + + let mut build_geog = Geography::new(); + + // Crude heuristic used by the S2Loop (build an index after 20 unindexed + // contains queries even for small looops). Review Comment: Spelling in comment: "looops" → "loops". ```suggestion // contains queries even for small loops). ``` ########## rust/sedona/src/context.rs: ########## @@ -160,6 +160,18 @@ impl SedonaContext { planner = planner.with_spatial_join_physical_planner(Arc::new( DefaultSpatialJoinPhysicalPlanner::new(), )); + + // Register the geography join after the default planer + // If a query is not supported, it falls back to the default planner. + #[cfg(feature = "s2geography")] Review Comment: Spelling: comment says "default planer"; should be "default planner". ########## rust/sedona-spatial-join-geography/src/refiner.rs: ########## @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Geography-specific refiner for spatial join predicate evaluation using s2geography. +//! +//! This module provides a refiner that evaluates spatial predicates on the sphere +//! using s2geography, rather than the default Cartesian predicates. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use datafusion_common::{exec_datafusion_err, Result}; +use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + operator::{Op, OpType}, +}; +use sedona_spatial_join::{ + refine::IndexQueryResultRefinerFactory, + spatial_predicate::{RelationPredicate, SpatialRelationType}, + utils::init_once_array::InitOnceArray, + IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate, +}; +use wkb::reader::Wkb; + +#[derive(Debug)] +pub(crate) struct GeographyRefiner { + op_type: OpType, + prepared_geoms: InitOnceArray<Option<Geography<'static>>>, + mem_used: AtomicUsize, +} + +impl GeographyRefiner { + pub fn new( + predicate: SpatialPredicate, + options: &SpatialJoinOptions, + num_build_geoms: usize, + ) -> Result<Self> { + let op_type = match predicate { + SpatialPredicate::Relation(RelationPredicate { + left: _, + right: _, + relation_type, + }) => match relation_type { + SpatialRelationType::Intersects => OpType::Intersects, + SpatialRelationType::Contains => OpType::Contains, + SpatialRelationType::Within => OpType::Within, + SpatialRelationType::Equals => OpType::Equals, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported relation type {relation_type}" + ) + } + }, + SpatialPredicate::Distance(_) => OpType::DWithin, + _ => { + return sedona_internal_err!( + "GeographyRefiner created with unsupported predicate {predicate}" + ) + } + }; + + // If we + let prepared_geoms = if matches!( + options.execution_mode, + ExecutionMode::PrepareNone | ExecutionMode::PrepareProbe + ) { + InitOnceArray::new(num_build_geoms) + } else { + InitOnceArray::new(0) + }; + + let mem_used = AtomicUsize::new(prepared_geoms.allocated_size()); + + Ok(Self { + op_type, + prepared_geoms, + mem_used, + }) + } +} + +impl IndexQueryResultRefiner for GeographyRefiner { + fn refine( + &self, + probe: &Wkb<'_>, + index_query_results: &[IndexQueryResult], + ) -> Result<Vec<(i32, i32)>> { + let mut results = Vec::with_capacity(index_query_results.len()); + let mut op = Op::new(self.op_type); + + // We may want thread local factories/build_geog/probe_geog here + let mut factory = GeographyFactory::new(); + let mut probe_geog = factory + .from_wkb(probe.buf()) + .map_err(|e| exec_datafusion_err!("{e}"))?; + + let mut build_geog = Geography::new(); + + // Crude heuristic used by the S2Loop (build an index after 20 unindexed + // contains queries even for small looops). + if probe.buf().len() > (32 * 2 * size_of::<f64>()) || index_query_results.len() > 20 { + probe_geog + .prepare() + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + + if !self.prepared_geoms.is_empty() { + // We're in prepared build mode + for result in index_query_results { + let (prepared_build_geom, is_newly_inserted) = + self.prepared_geoms.get_or_create(result.geom_idx, || { + // Basically, prepare anything except points on the build side + if result.wkb.buf().len() > 32 { + let mut geog = factory + .from_wkb(result.wkb.buf()) + .map_err(|e| exec_datafusion_err!("{e}"))?; + geog.prepare().map_err(|e| exec_datafusion_err!("{e}"))?; + Ok(Some(unsafe { + // Safety: the evaluated batches keep the required WKB alive Review Comment: The `unsafe` transmute extends `Geography`'s lifetime to `'static` even though it is created from non-owning WKB bytes. Please add a more complete SAFETY comment documenting the required invariants (e.g., that the underlying WKB buffers are kept alive for at least as long as this cached Geography, across spills/partitioning), or switch to a design that ties the cache entry lifetime to an owned buffer/Arc so this doesn't rely on `transmute`. ```suggestion // SAFETY: `GeographyFactory::from_wkb` creates a `Geography` // that borrows the bytes in `result.wkb.buf()`. We cache the // prepared geography in `self.prepared_geoms`, so extending that // borrow to `'static` is only sound if the backing WKB buffer is // guaranteed to remain alive and unchanged for at least as long as // this cache entry can be observed and used. // // Required invariants: // * the bytes referenced by `result.wkb.buf()` are owned by // storage whose lifetime covers the entire lifetime of the // corresponding entry in `self.prepared_geoms`; // * that storage is not freed, reallocated, or mutated in a way // that would invalidate interior references held by `geog`; // * the guarantee above continues to hold across all execution // paths that may reuse this cache entry, including batch // boundaries, partitioning/repartitioning, spilling, and any // other movement of records between operators or tasks. // // If those invariants do not hold, this transmute is unsound and // the cache must instead own the WKB bytes (for example via an // owned buffer or `Arc`) so the `Geography` lifetime is tied to // owned storage rather than being widened with `transmute`. ``` ########## rust/sedona/src/context.rs: ########## @@ -160,6 +160,18 @@ impl SedonaContext { planner = planner.with_spatial_join_physical_planner(Arc::new( DefaultSpatialJoinPhysicalPlanner::new(), )); + + // Register the geography join after the default planer + // If a query is not supported, it falls back to the default planner. + #[cfg(feature = "s2geography")] + { + use sedona_spatial_join_geography::physical_planner::GeographySpatialJoinPhysicalPlanner; + + planner = planner.with_spatial_join_physical_planner(Arc::new( + GeographySpatialJoinPhysicalPlanner::new(), + )); + } + // Register the GPU join after the default planer // If a query is not supported, it falls back to the default planner. Review Comment: Spelling: comment says "default planer"; should be "default planner". ########## rust/sedona-spatial-join-geography/src/join_provider.rs: ########## @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, Float64Array}; +use datafusion_common::{exec_datafusion_err, JoinType, Result, ScalarValue}; +use datafusion_physical_plan::ColumnarValue; +use geo_index::rtree::util::f64_box_to_f32; +use geo_types::{coord, Rect}; +use sedona_common::sedona_internal_err; +use sedona_expr::statistics::GeoStatistics; +use sedona_functions::executor::IterGeo; +use sedona_s2geography::{ + geography::{Geography, GeographyFactory}, + rect_bounder::RectBounder, +}; +use sedona_schema::datatypes::SedonaType; +use sedona_spatial_join::{ + index::{spatial_index_builder::SpatialJoinBuildMetrics, SpatialIndexBuilder}, + join_provider::SpatialJoinProvider, + operand_evaluator::{EvaluatedGeometryArray, EvaluatedGeometryArrayFactory}, + SpatialJoinOptions, SpatialPredicate, +}; + +use crate::spatial_index_builder::GeographySpatialIndexBuilder; + +#[derive(Debug)] +pub(crate) struct GeographyJoinProvider; + +impl SpatialJoinProvider for GeographyJoinProvider { + fn try_new_spatial_index_builder( + &self, + schema: arrow_schema::SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + ) -> Result<Box<dyn SpatialIndexBuilder>> { + // Create the inner (default) builder + let builder = GeographySpatialIndexBuilder::new( + schema, + spatial_predicate.clone(), + options, + join_type, + probe_threads_count, + metrics, + )?; + + Ok(Box::new(builder)) + } + + fn estimate_extra_memory_usage( + &self, + _geo_stats: &GeoStatistics, + _spatial_predicate: &SpatialPredicate, + _options: &SpatialJoinOptions, + ) -> usize { + // TODO: calculate + 0 + } Review Comment: `estimate_extra_memory_usage()` currently returns 0. This feeds into the build-side memory reservation logic; returning 0 can significantly underestimate memory needed for the geography refiner/prepared geoms and lead to OOM/spill behavior. Consider estimating via `GeographyRefinerFactory.create_refiner(...).estimate_max_memory_usage(geo_stats)` plus the default R-tree estimate, similar to `DefaultSpatialIndexBuilder::estimate_extra_memory_usage()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
