Copilot commented on code in PR #775:
URL: https://github.com/apache/sedona-db/pull/775#discussion_r3133495877
##########
rust/sedona-spatial-join/src/operand_evaluator.rs:
##########
@@ -60,13 +61,67 @@ pub trait EvaluatedGeometryArrayFactory: fmt::Debug + Send
+ Sync {
#[derive(Debug)]
pub(crate) struct DefaultGeometryArrayFactory;
+impl DefaultGeometryArrayFactory {
+ fn expand_evaluated_array_by_distance(
+ &self,
+ result: &mut EvaluatedGeometryArray,
+ distance_columnar_value: &ColumnarValue,
+ ) -> Result<()> {
+ // Expand the vec by distance
+
+ // No timezone conversion needed for distance; pass None as
cast_options explicitly.
+ let distance_columnar_value =
distance_columnar_value.cast_to(&DataType::Float64, None)?;
+ match &distance_columnar_value {
+ ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => {
+ result.rects.iter_mut().for_each(|rect_opt| {
+ if let Some(rect) = rect_opt {
+ expand_rect_in_place(rect, *distance);
+ };
+ });
+ }
+ ColumnarValue::Scalar(ScalarValue::Float64(None)) => {
+ // Distance expression evaluates to NULL, the resulting
distance should be NULL as well.
+ result.rects.clear();
+ }
+ ColumnarValue::Array(array) => {
+ if let Some(array) =
array.as_any().downcast_ref::<Float64Array>() {
+ for (geom_idx, rect_opt) in
result.rects.iter_mut().enumerate() {
+ if !array.is_null(geom_idx) {
+ let dist = array.value(geom_idx);
+ if let Some(rect) = rect_opt {
+ expand_rect_in_place(rect, dist);
+ };
+ }
+ }
Review Comment:
For `ColumnarValue::Array`, NULL distance entries are currently skipped (no
rect modification). For distance predicates, a NULL distance should behave like
“no match” for that row, so leaving the original rect can create false
candidates and extra work. Consider setting the corresponding `rect_opt` to
`None` when `array.is_null(geom_idx)` (while keeping the vector length
unchanged).
##########
rust/sedona-spatial-join/src/operand_evaluator.rs:
##########
@@ -60,13 +61,67 @@ pub trait EvaluatedGeometryArrayFactory: fmt::Debug + Send
+ Sync {
#[derive(Debug)]
pub(crate) struct DefaultGeometryArrayFactory;
+impl DefaultGeometryArrayFactory {
+ fn expand_evaluated_array_by_distance(
+ &self,
+ result: &mut EvaluatedGeometryArray,
+ distance_columnar_value: &ColumnarValue,
+ ) -> Result<()> {
+ // Expand the vec by distance
+
+ // No timezone conversion needed for distance; pass None as
cast_options explicitly.
+ let distance_columnar_value =
distance_columnar_value.cast_to(&DataType::Float64, None)?;
+ match &distance_columnar_value {
+ ColumnarValue::Scalar(ScalarValue::Float64(Some(distance))) => {
+ result.rects.iter_mut().for_each(|rect_opt| {
+ if let Some(rect) = rect_opt {
+ expand_rect_in_place(rect, *distance);
+ };
+ });
+ }
+ ColumnarValue::Scalar(ScalarValue::Float64(None)) => {
+ // Distance expression evaluates to NULL, the resulting
distance should be NULL as well.
+ result.rects.clear();
Review Comment:
`ColumnarValue::Scalar(ScalarValue::Float64(None))` currently does
`result.rects.clear()`, which makes `rects.len()` differ from
`geometry_array.len()`. Downstream code assumes these lengths match (e.g.,
evaluated batch spilling iterates `geom_array.rects()` while building an array
of `num_rows`), so this can break spilling / index building for distance joins
with NULL distances. Prefer preserving `rects` length and setting all entries
to `None` (or otherwise ensuring it stays `num_rows`).
```suggestion
// Distance expression evaluates to NULL, so preserve row
alignment and mark
// every expanded rectangle as NULL rather than shrinking
the vector.
result.rects.iter_mut().for_each(|rect_opt| *rect_opt =
None);
```
##########
rust/sedona-spatial-join-geography/src/refiner.rs:
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Geography-specific refiner for spatial join predicate evaluation using
s2geography.
+//!
+//! This module provides a refiner that evaluates spatial predicates on the
sphere
+//! using s2geography, rather than the default Cartesian predicates.
+
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use datafusion_common::{exec_datafusion_err, Result};
+use sedona_common::{sedona_internal_err, ExecutionMode, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_s2geography::{
+ geography::{Geography, GeographyFactory},
+ operator::{Op, OpType},
+};
+use sedona_spatial_join::{
+ refine::IndexQueryResultRefinerFactory,
+ spatial_predicate::{RelationPredicate, SpatialRelationType},
+ utils::init_once_array::InitOnceArray,
+ IndexQueryResult, IndexQueryResultRefiner, SpatialPredicate,
+};
+use wkb::reader::Wkb;
+
+#[derive(Debug)]
+pub(crate) struct GeographyRefiner {
+ op_type: OpType,
+ prepared_geoms: InitOnceArray<Option<Geography<'static>>>,
+ mem_used: AtomicUsize,
+}
+
+impl GeographyRefiner {
+ pub fn new(
+ predicate: SpatialPredicate,
+ options: &SpatialJoinOptions,
+ num_build_geoms: usize,
+ ) -> Result<Self> {
+ let op_type = match predicate {
+ SpatialPredicate::Relation(RelationPredicate {
+ left: _,
+ right: _,
+ relation_type,
+ }) => match relation_type {
+ SpatialRelationType::Intersects => OpType::Intersects,
+ SpatialRelationType::Contains => OpType::Contains,
+ SpatialRelationType::Within => OpType::Within,
+ SpatialRelationType::Equals => OpType::Equals,
+ _ => {
+ return sedona_internal_err!(
+ "GeographyRefiner created with unsupported relation
type {relation_type}"
+ )
+ }
+ },
+ SpatialPredicate::Distance(_) => OpType::DWithin,
+ _ => {
+ return sedona_internal_err!(
+ "GeographyRefiner created with unsupported predicate
{predicate}"
+ )
+ }
+ };
+
+ // Allow join options to turn off preparedness
+ let prepared_geoms = if !matches!(
+ options.execution_mode,
+ ExecutionMode::PrepareNone | ExecutionMode::PrepareProbe
+ ) {
+ InitOnceArray::new(num_build_geoms)
+ } else {
+ InitOnceArray::new(0)
+ };
+
+ let mem_used = AtomicUsize::new(prepared_geoms.allocated_size());
+
+ Ok(Self {
+ op_type,
+ prepared_geoms,
+ mem_used,
+ })
+ }
+}
+
+impl IndexQueryResultRefiner for GeographyRefiner {
+ fn refine(
+ &self,
+ probe: &Wkb<'_>,
+ index_query_results: &[IndexQueryResult],
+ ) -> Result<Vec<(i32, i32)>> {
+ let mut results = Vec::with_capacity(index_query_results.len());
+ let mut op = Op::new(self.op_type);
+
+ // We may want thread local factories/build_geog/probe_geog here
+ let mut factory = GeographyFactory::new();
+ let mut probe_geog = factory
+ .from_wkb(probe.buf())
+ .map_err(|e| exec_datafusion_err!("{e}"))?;
+
+ let mut build_geog = Geography::new();
+
+ // Crude heuristic used by the S2Loop (build an index after 20
unindexed
+ // contains queries even for small loops).
+ if probe.buf().len() > (32 * 2 * size_of::<f64>()) ||
index_query_results.len() > 20 {
+ probe_geog
+ .prepare()
+ .map_err(|e| exec_datafusion_err!("{e}"))?;
+ }
+
+ if !self.prepared_geoms.is_empty() {
+ // We're in prepared build mode
+ for result in index_query_results {
+ let (prepared_build_geom, is_newly_inserted) =
+ self.prepared_geoms.get_or_create(result.geom_idx, || {
+ // Basically, prepare anything except points on the
build side
+ if result.wkb.buf().len() > 32 {
+ let mut geog = factory
+ .from_wkb(result.wkb.buf())
+ .map_err(|e| exec_datafusion_err!("{e}"))?;
+ geog.prepare().map_err(|e|
exec_datafusion_err!("{e}"))?;
+ Ok(Some(unsafe {
+ // Safety: the evaluated batches keep the
required WKB alive. The
+ // refiner always outlives the evaluated
batches.
+ std::mem::transmute::<Geography<'_>,
Geography<'static>>(geog)
+ }))
+ } else {
+ Ok(None)
+ }
+ })?;
+
+ let build_geog_ref = if let Some(prepared_geog) =
prepared_build_geom {
+ if is_newly_inserted {
+ self.mem_used
+ .fetch_add(prepared_geog.mem_used(),
Ordering::Relaxed);
+ }
+
+ prepared_geog
+ } else {
+ factory
+ .init_from_wkb(result.wkb.buf(), &mut build_geog)
+ .map_err(|e| exec_datafusion_err!("{e}"))?;
+ &build_geog
+ };
+
+ let eval = if matches!(self.op_type, OpType::DWithin) {
+ op.eval_binary_distance_predicate(
+ build_geog_ref,
+ &probe_geog,
+ result.distance.unwrap_or(f64::INFINITY),
+ )
+ } else {
Review Comment:
For `OpType::DWithin`, `result.distance.unwrap_or(f64::INFINITY)` treats a
missing/NULL distance as “infinite”, which makes the predicate trivially true
and can produce incorrect join results. Other refiners treat `None` distance as
`false`. Consider returning `false` when `result.distance` is `None` (and/or
ensure NULL distances never reach refinement).
##########
rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs:
##########
@@ -88,26 +89,37 @@ impl DefaultSpatialIndexBuilder {
join_type,
probe_threads_count,
metrics,
+ refiner_factory: Arc::new(DefaultIndexQueryResultRefinerFactory),
indexed_batches: Vec::new(),
stats: GeoStatistics::empty(),
memory_used: 0,
})
}
- pub(crate) fn estimate_extra_memory_usage(
+ pub fn with_refiner_factory(
+ mut self,
+ refiner_factory: Arc<dyn IndexQueryResultRefinerFactory>,
+ ) -> Self {
+ self.refiner_factory = refiner_factory;
+ self
+ }
+
+ pub fn estimate_extra_memory_usage(
geo_stats: &GeoStatistics,
spatial_predicate: &SpatialPredicate,
options: &SpatialJoinOptions,
+ refiner_factory: Arc<dyn IndexQueryResultRefinerFactory>,
) -> usize {
// Estimate the amount of memory needed by the refiner
let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
- let refiner = create_refiner(
- options.spatial_library,
- spatial_predicate,
- options.clone(),
- num_geoms,
- geo_stats.clone(),
- );
+ let refiner = refiner_factory
+ .create_refiner(
+ spatial_predicate,
+ options.clone(),
+ num_geoms,
+ geo_stats.clone(),
+ )
+ .expect("Refiner can be constructed");
let refiner_mem_usage = refiner.estimate_max_memory_usage(geo_stats);
Review Comment:
`estimate_extra_memory_usage()` uses `.expect("Refiner can be constructed")`
on the refiner factory result. Since
`IndexQueryResultRefinerFactory::create_refiner()` is now part of the extension
surface and returns `Result`, this introduces a possible runtime panic (e.g.,
custom factories or unsupported predicates/options). Prefer propagating the
error (change signature to return `Result<usize>`) or falling back to a
conservative estimate when refiner construction fails.
##########
rust/sedona-spatial-join/src/utils/init_once_array.rs:
##########
@@ -26,7 +26,8 @@ use std::sync::atomic::{AtomicPtr, Ordering};
///
/// Uses atomic pointers for lock-free operations with proper memory ordering.
/// Null pointers indicate empty slots.
-pub(crate) struct InitOnceArray<T: Send + Sync> {
+#[derive(Debug)]
+pub struct InitOnceArray<T: Send + Sync> {
ptrs: Vec<AtomicPtr<T>>,
Review Comment:
`InitOnceArray` is now public, but several methods are documented as
“internal API” and claim out-of-bounds access returns `None`, while the
implementation will panic via `self.ptrs[index]`. If this is intended to be a
public utility for extension crates, the API/docs should be made consistent
(either add bounds-checked variants / return `Option`/`Result`, or keep the
type `pub(crate)` and/or clearly document the panic behavior without mentioning
out-of-bounds returning `None`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]