This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 40e1460d refactor(rust/sedona-query-planner): Move query planner and
utilities to dedicated crate (#735)
40e1460d is described below
commit 40e1460d15ea64d359b66fa7c070921f24846476
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Apr 7 11:01:54 2026 -0500
refactor(rust/sedona-query-planner): Move query planner and utilities to
dedicated crate (#735)
---
Cargo.lock | 20 ++
Cargo.toml | 2 +
rust/sedona-query-planner/Cargo.toml | 45 +++
.../src/lib.rs | 33 +-
.../src}/logical_plan_node.rs | 2 +-
.../src}/optimizer.rs | 6 +-
.../src}/probe_shuffle_exec.rs | 0
rust/sedona-query-planner/src/query_planner.rs | 93 ++++++
.../src}/spatial_expr_utils.rs | 130 +-------
.../src/spatial_join_physical_planner.rs | 252 +++++++++++++++
.../src/spatial_predicate.rs | 0
rust/sedona-spatial-join/Cargo.toml | 1 +
rust/sedona-spatial-join/src/index.rs | 9 +-
.../src/index/default_spatial_index.rs | 1 -
.../sedona-spatial-join/src/index/spatial_index.rs | 3 +-
.../src/index/spatial_index_builder.rs | 8 +-
rust/sedona-spatial-join/src/join_provider.rs | 2 +-
rust/sedona-spatial-join/src/lib.rs | 12 +-
rust/sedona-spatial-join/src/operand_evaluator.rs | 2 +-
rust/sedona-spatial-join/src/physical_planner.rs | 332 +++++++++++++++++++
rust/sedona-spatial-join/src/planner.rs | 47 ---
.../src/planner/physical_planner.rs | 360 ---------------------
.../tests/spatial_join_integration.rs | 14 +-
rust/sedona/Cargo.toml | 1 +
rust/sedona/src/context.rs | 13 +-
25 files changed, 801 insertions(+), 587 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 3bafcc4f..567c1a10 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5201,6 +5201,7 @@ dependencies = [
"sedona-geos",
"sedona-pointcloud",
"sedona-proj",
+ "sedona-query-planner",
"sedona-raster-functions",
"sedona-s2geography",
"sedona-schema",
@@ -5602,6 +5603,24 @@ dependencies = [
"wkb",
]
+[[package]]
+name = "sedona-query-planner"
+version = "0.4.0"
+dependencies = [
+ "arrow",
+ "arrow-schema",
+ "async-trait",
+ "datafusion",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-schema",
+]
+
[[package]]
name = "sedona-raster"
version = "0.4.0"
@@ -5715,6 +5734,7 @@ dependencies = [
"sedona-geo-traits-ext",
"sedona-geometry",
"sedona-geos",
+ "sedona-query-planner",
"sedona-schema",
"sedona-testing",
"sedona-tg",
diff --git a/Cargo.toml b/Cargo.toml
index 9237305c..478d2ae8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ members = [
"rust/sedona-raster-functions",
"rust/sedona-schema",
"rust/sedona-spatial-join",
+ "rust/sedona-query-planner",
"rust/sedona-testing",
"rust/sedona",
"sedona-cli",
@@ -146,6 +147,7 @@ sedona-raster = { version = "0.4.0", path =
"rust/sedona-raster" }
sedona-raster-functions = { version = "0.4.0", path =
"rust/sedona-raster-functions" }
sedona-schema = { version = "0.4.0", path = "rust/sedona-schema" }
sedona-spatial-join = { version = "0.4.0", path = "rust/sedona-spatial-join" }
+sedona-query-planner = { version = "0.4.0", path = "rust/sedona-query-planner"
}
sedona-testing = { version = "0.4.0", path = "rust/sedona-testing" }
# C wrapper crates
diff --git a/rust/sedona-query-planner/Cargo.toml
b/rust/sedona-query-planner/Cargo.toml
new file mode 100644
index 00000000..95ace915
--- /dev/null
+++ b/rust/sedona-query-planner/Cargo.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-query-planner"
+version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description = "Query planner and common traits for spatial join operations in
Apache SedonaDB"
+edition.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+sedona-common = { workspace = true }
+sedona-expr = { workspace = true }
+sedona-schema = { workspace = true }
+
+[dev-dependencies]
+arrow = { workspace = true }
+datafusion = { workspace = true }
+sedona-schema = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/lib.rs
b/rust/sedona-query-planner/src/lib.rs
similarity index 53%
copy from rust/sedona-spatial-join/src/lib.rs
copy to rust/sedona-query-planner/src/lib.rs
index 372efb7f..4bec6d2b 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-query-planner/src/lib.rs
@@ -15,31 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-pub mod evaluated_batch;
-pub mod exec;
-mod index;
-mod join_provider;
-pub mod operand_evaluator;
-pub mod partitioning;
-pub mod planner;
-mod prepare;
-mod probe;
-pub mod refine;
+mod logical_plan_node;
+pub mod optimizer;
+pub mod probe_shuffle_exec;
+pub mod query_planner;
+mod spatial_expr_utils;
+pub mod spatial_join_physical_planner;
pub mod spatial_predicate;
-mod stream;
-pub mod utils;
-
-pub use exec::SpatialJoinExec;
-
-// Re-export function for register the spatial join planner
-pub use planner::register_planner;
-
-// Re-export ProbeShuffleExec so that integration tests (and other crates) can
verify
-// its presence in optimized physical plans.
-pub use planner::probe_shuffle_exec::ProbeShuffleExec;
-
-// Re-export types needed for external usage (e.g., in Comet)
-pub use spatial_predicate::SpatialPredicate;
-
-// Re-export option types from sedona-common for convenience
-pub use sedona_common::option::*;
diff --git a/rust/sedona-spatial-join/src/planner/logical_plan_node.rs
b/rust/sedona-query-planner/src/logical_plan_node.rs
similarity index 99%
rename from rust/sedona-spatial-join/src/planner/logical_plan_node.rs
rename to rust/sedona-query-planner/src/logical_plan_node.rs
index 23f35d09..7a9bf952 100644
--- a/rust/sedona-spatial-join/src/planner/logical_plan_node.rs
+++ b/rust/sedona-query-planner/src/logical_plan_node.rs
@@ -29,7 +29,7 @@ use sedona_common::sedona_internal_err;
/// Carries a join's inputs and filter expression so the physical planner can
recognize and plan
/// a `SpatialJoinExec`.
#[derive(PartialEq, Eq, Hash)]
-pub(crate) struct SpatialJoinPlanNode {
+pub struct SpatialJoinPlanNode {
pub left: LogicalPlan,
pub right: LogicalPlan,
pub join_type: JoinType,
diff --git a/rust/sedona-spatial-join/src/planner/optimizer.rs
b/rust/sedona-query-planner/src/optimizer.rs
similarity index 99%
rename from rust/sedona-spatial-join/src/planner/optimizer.rs
rename to rust/sedona-query-planner/src/optimizer.rs
index bacaf7a9..7a669038 100644
--- a/rust/sedona-spatial-join/src/planner/optimizer.rs
+++ b/rust/sedona-query-planner/src/optimizer.rs
@@ -16,8 +16,8 @@
// under the License.
use std::sync::Arc;
-use crate::planner::logical_plan_node::SpatialJoinPlanNode;
-use crate::planner::spatial_expr_utils::{
+use crate::logical_plan_node::SpatialJoinPlanNode;
+use crate::spatial_expr_utils::{
collect_spatial_predicate_names, find_knn_query_side, KNNJoinQuerySide,
};
use datafusion::execution::session_state::SessionStateBuilder;
@@ -52,7 +52,7 @@ use sedona_common::{sedona_internal_datafusion_err,
sedona_internal_err};
///
/// - `SpatialJoinLogicalRewrite` is appended at the end so that non-KNN
spatial joins still
/// benefit from filter pushdown before being converted to extension nodes.
-pub(crate) fn register_spatial_join_logical_optimizer(
+pub fn register_spatial_join_logical_optimizer(
mut session_state_builder: SessionStateBuilder,
) -> Result<SessionStateBuilder> {
let optimizer = session_state_builder
diff --git a/rust/sedona-spatial-join/src/planner/probe_shuffle_exec.rs
b/rust/sedona-query-planner/src/probe_shuffle_exec.rs
similarity index 100%
rename from rust/sedona-spatial-join/src/planner/probe_shuffle_exec.rs
rename to rust/sedona-query-planner/src/probe_shuffle_exec.rs
diff --git a/rust/sedona-query-planner/src/query_planner.rs
b/rust/sedona-query-planner/src/query_planner.rs
new file mode 100644
index 00000000..8fd2551d
--- /dev/null
+++ b/rust/sedona-query-planner/src/query_planner.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Query planner that delegates to DataFusion's [`DefaultPhysicalPlanner`]
+//! with a configurable set of [`ExtensionPlanner`]s.
+
+use std::fmt;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::execution::context::QueryPlanner;
+use datafusion::execution::session_state::SessionState;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner,
PhysicalPlanner};
+use datafusion_common::Result;
+use datafusion_expr::LogicalPlan;
+
+use crate::spatial_join_physical_planner::{
+ SpatialJoinExtensionPlanner, SpatialJoinPhysicalPlanner,
+};
+
+/// Query planner that wraps DataFusion's [`DefaultPhysicalPlanner`] with a set
+/// of extension planners that handle custom logical nodes (e.g. spatial
joins).
+pub struct SedonaQueryPlanner {
+ spatial_join_planner: SpatialJoinExtensionPlanner,
+}
+
+impl SedonaQueryPlanner {
+ /// Create a new [`SedonaQueryPlanner`] with the given extension planners.
+ pub fn new() -> Self {
+ Self {
+ spatial_join_planner: SpatialJoinExtensionPlanner::new(vec![]),
+ }
+ }
+
+ /// Append a [SpatialJoinFactory] to the planner
+ ///
+ /// Note that [crate::optimizer::register_spatial_join_logical_optimizer]
is required
+ /// to ensure a SpatialJoinExec exists in a logical plan.
+ pub fn with_spatial_join_physical_planner(
+ mut self,
+ factory: Arc<dyn SpatialJoinPhysicalPlanner>,
+ ) -> Self {
+ self.spatial_join_planner
+ .append_spatial_join_physical_planner(factory);
+ self
+ }
+
+ fn extension_planners(&self) -> Vec<Arc<dyn ExtensionPlanner + Send +
Sync>> {
+ vec![Arc::new(self.spatial_join_planner.clone())]
+ }
+}
+
+impl Default for SedonaQueryPlanner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl fmt::Debug for SedonaQueryPlanner {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SedonaQueryPlanner").finish()
+ }
+}
+
+#[async_trait]
+impl QueryPlanner for SedonaQueryPlanner {
+ async fn create_physical_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let physical_planner =
+
DefaultPhysicalPlanner::with_extension_planners(self.extension_planners());
+ physical_planner
+ .create_physical_plan(logical_plan, session_state)
+ .await
+ }
+}
diff --git a/rust/sedona-spatial-join/src/planner/spatial_expr_utils.rs
b/rust/sedona-query-planner/src/spatial_expr_utils.rs
similarity index 95%
rename from rust/sedona-spatial-join/src/planner/spatial_expr_utils.rs
rename to rust/sedona-query-planner/src/spatial_expr_utils.rs
index 700b51bc..2c1a18d2 100644
--- a/rust/sedona-spatial-join/src/planner/spatial_expr_utils.rs
+++ b/rust/sedona-query-planner/src/spatial_expr_utils.rs
@@ -21,23 +21,19 @@ use std::sync::Arc;
use crate::spatial_predicate::{
DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate,
SpatialRelationType,
};
-use arrow_schema::Schema;
use datafusion_common::ScalarValue;
use datafusion_common::{
tree_node::{Transformed, TreeNode},
Column as LogicalColumn, JoinSide,
};
-use datafusion_common::{DFSchema, HashMap, Result};
+use datafusion_common::{DFSchema, HashMap};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::utils::JoinFilter;
-use sedona_common::sedona_internal_err;
use sedona_expr::utils::{parse_distance_predicate, ParsedDistancePredicate};
-use sedona_schema::datatypes::SedonaType;
-use sedona_schema::matchers::ArgMatcher;
/// Collect the names of spatial predicates appeared in expr. We assume that
the given
/// `expr` evaluates to a boolean value and originates from a filter logical
node.
@@ -567,48 +563,6 @@ fn replace_join_filter_expr(expr: &Arc<dyn PhysicalExpr>,
join_filter: &JoinFilt
)
}
-pub(crate) fn is_spatial_predicate_supported(
- spatial_predicate: &SpatialPredicate,
- left_schema: &Schema,
- right_schema: &Schema,
-) -> Result<bool> {
- /// Only spatial predicates working with planar geometry are supported for
optimization.
- /// Geography (spherical) types are explicitly excluded and will not
trigger optimized spatial joins.
- fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema:
&Schema) -> Result<bool> {
- let left_return_field = expr.return_field(schema)?;
- let sedona_type = SedonaType::from_storage_field(&left_return_field)?;
- let matcher = ArgMatcher::is_geometry();
- Ok(matcher.match_type(&sedona_type))
- }
-
- match spatial_predicate {
- SpatialPredicate::Relation(RelationPredicate { left, right, .. })
- | SpatialPredicate::Distance(DistancePredicate { left, right, .. }) =>
{
- Ok(is_geometry_type_supported(left, left_schema)?
- && is_geometry_type_supported(right, right_schema)?)
- }
- SpatialPredicate::KNearestNeighbors(KNNPredicate {
- left,
- right,
- probe_side,
- ..
- }) => {
- let (left, right) = match probe_side {
- JoinSide::Left => (left, right),
- JoinSide::Right => (right, left),
- _ => {
- return sedona_internal_err!(
- "Invalid probe side in KNN predicate: {:?}",
- probe_side
- )
- }
- };
- Ok(is_geometry_type_supported(left, left_schema)?
- && is_geometry_type_supported(right, right_schema)?)
- }
- }
-}
-
/// Which side of the join is the query (probe) side for KNN.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum KNNJoinQuerySide {
@@ -652,7 +606,7 @@ pub(crate) fn find_knn_query_side(
/// Recursively find the `ST_KNN` scalar function call in an expression tree.
///
/// Searches through `AND` binary expressions to find the `ST_KNN` function
call.
-pub(crate) fn find_st_knn_call(expr: &Expr) -> Option<&ScalarFunction> {
+fn find_st_knn_call(expr: &Expr) -> Option<&ScalarFunction> {
match expr {
Expr::ScalarFunction(func) if
func.func.name().eq_ignore_ascii_case("st_knn") => Some(func),
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr { left, right, op })
@@ -677,7 +631,7 @@ mod tests {
use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::utils::JoinFilter;
- use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
+ use sedona_schema::datatypes::WKB_GEOMETRY;
use std::sync::Arc;
// Helper function to create a test schema
@@ -2271,84 +2225,6 @@ mod tests {
assert!(result.is_none()); // Should fail - k must be a literal value
}
- #[test]
- fn test_is_spatial_predicate_supported() {
- // Planar geometry field
- let geom_field = WKB_GEOMETRY.to_storage_field("geom", false).unwrap();
- let schema = Arc::new(Schema::new(vec![geom_field.clone()]));
- let col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn
PhysicalExpr>;
- let rel_pred = RelationPredicate::new(
- col_expr.clone(),
- col_expr.clone(),
- SpatialRelationType::Intersects,
- );
- let spatial_pred = SpatialPredicate::Relation(rel_pred);
- assert!(is_spatial_predicate_supported(&spatial_pred, &schema,
&schema).unwrap());
-
- // Geography field (should NOT be supported)
- let geog_field = WKB_GEOGRAPHY.to_storage_field("geog",
false).unwrap();
- let geog_schema = Arc::new(Schema::new(vec![geog_field.clone()]));
- let geog_col_expr = Arc::new(Column::new("geog", 0)) as Arc<dyn
PhysicalExpr>;
- let rel_pred_geog = RelationPredicate::new(
- geog_col_expr.clone(),
- geog_col_expr.clone(),
- SpatialRelationType::Intersects,
- );
- let spatial_pred_geog = SpatialPredicate::Relation(rel_pred_geog);
- assert!(
- !is_spatial_predicate_supported(&spatial_pred_geog, &geog_schema,
&geog_schema)
- .unwrap()
- );
- }
-
- #[test]
- fn test_is_knn_predicate_supported() {
- // ST_KNN(left, right)
- let left_schema = Arc::new(Schema::new(vec![WKB_GEOMETRY
- .to_storage_field("geom", false)
- .unwrap()]));
- let right_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- WKB_GEOMETRY.to_storage_field("geom", false).unwrap(),
- ]));
- let left_col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn
PhysicalExpr>;
- let right_col_expr = Arc::new(Column::new("geom", 1)) as Arc<dyn
PhysicalExpr>;
- let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
- left_col_expr.clone(),
- right_col_expr.clone(),
- 5,
- false,
- JoinSide::Left,
- ));
- assert!(is_spatial_predicate_supported(&knn_pred, &left_schema,
&right_schema).unwrap());
-
- // ST_KNN(right, left)
- let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
- right_col_expr.clone(),
- left_col_expr.clone(),
- 5,
- false,
- JoinSide::Right,
- ));
- assert!(is_spatial_predicate_supported(&knn_pred, &left_schema,
&right_schema).unwrap());
-
- // ST_KNN with geography (should NOT be supported)
- let left_geog_schema = Arc::new(Schema::new(vec![WKB_GEOGRAPHY
- .to_storage_field("geog", false)
- .unwrap()]));
- assert!(
- !is_spatial_predicate_supported(&knn_pred, &left_geog_schema,
&right_schema).unwrap()
- );
-
- let right_geog_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- WKB_GEOGRAPHY.to_storage_field("geog", false).unwrap(),
- ]));
- assert!(
- !is_spatial_predicate_supported(&knn_pred, &left_schema,
&right_geog_schema).unwrap()
- );
- }
-
#[test]
fn test_collect_spatial_predicate_names() {
// ST_Intersects should be collected
diff --git a/rust/sedona-query-planner/src/spatial_join_physical_planner.rs
b/rust/sedona-query-planner/src/spatial_join_physical_planner.rs
new file mode 100644
index 00000000..db37c3f1
--- /dev/null
+++ b/rust/sedona-query-planner/src/spatial_join_physical_planner.rs
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Extension planner for spatial join plan nodes.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use arrow_schema::Schema;
+
+use datafusion::config::ConfigOptions;
+use datafusion::execution::session_state::SessionState;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
+use datafusion_common::{plan_err, DFSchema, Result};
+use datafusion_expr::logical_plan::UserDefinedLogicalNode;
+use datafusion_expr::{JoinType, LogicalPlan};
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_plan::joins::utils::JoinFilter;
+use datafusion_physical_plan::joins::NestedLoopJoinExec;
+use sedona_common::option::SedonaOptions;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+
+use crate::logical_plan_node::SpatialJoinPlanNode;
+use crate::spatial_expr_utils::transform_join_filter;
+use crate::spatial_predicate::SpatialPredicate;
+
+/// Arguments passed to a [`SpatialJoinPhysicalPlanner`] when planning a
spatial join.
+pub struct PlanSpatialJoinArgs<'a> {
+ pub physical_left: &'a Arc<dyn ExecutionPlan>,
+ pub physical_right: &'a Arc<dyn ExecutionPlan>,
+ pub spatial_predicate: &'a SpatialPredicate,
+ pub remainder: Option<&'a JoinFilter>,
+ pub join_type: &'a JoinType,
+ pub join_options: &'a SpatialJoinOptions,
+ pub options: &'a Arc<ConfigOptions>,
+}
+
+/// Factory trait for creating spatial join physical plans.
+///
+/// Implementations decide whether they can handle a given spatial predicate
and,
+/// if so, produce an appropriate [`ExecutionPlan`].
+pub trait SpatialJoinPhysicalPlanner: std::fmt::Debug + Send + Sync {
+ /// Given the provided arguments, produce an [ExecutionPlan] implementing
+ /// the join operation or `None` if this implementation cannot execute the
join
+ fn plan_spatial_join(
+ &self,
+ args: &PlanSpatialJoinArgs<'_>,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
+}
+
+/// Physical planner hook for [`SpatialJoinPlanNode`].
+///
+/// Delegates to a list of [`SpatialJoinFactory`] implementations to
+/// produce a physical plan for spatial join nodes. Falls back to
+/// [`NestedLoopJoinExec`] when no factory can handle the predicate.
+#[derive(Clone, Debug)]
+pub struct SpatialJoinExtensionPlanner {
+ factories: Vec<Arc<dyn SpatialJoinPhysicalPlanner>>,
+}
+
+impl SpatialJoinExtensionPlanner {
+ /// Create a new planner with the given factories.
+ pub fn new(factories: Vec<Arc<dyn SpatialJoinPhysicalPlanner>>) -> Self {
+ Self { factories }
+ }
+
+ /// Append a new join factory
+ ///
+ /// Implementations are checked in reverse order such that more recently
added
+ /// implementations can override the default join.
+ pub fn append_spatial_join_physical_planner(
+ &mut self,
+ factory: Arc<dyn SpatialJoinPhysicalPlanner>,
+ ) {
+ self.factories.push(factory);
+ }
+}
+
+#[async_trait]
+impl ExtensionPlanner for SpatialJoinExtensionPlanner {
+ async fn plan_extension(
+ &self,
+ _planner: &dyn PhysicalPlanner,
+ node: &dyn UserDefinedLogicalNode,
+ logical_inputs: &[&LogicalPlan],
+ physical_inputs: &[Arc<dyn ExecutionPlan>],
+ session_state: &SessionState,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let Some(spatial_node) =
node.as_any().downcast_ref::<SpatialJoinPlanNode>() else {
+ return Ok(None);
+ };
+
+ let Some(ext) = session_state
+ .config_options()
+ .extensions
+ .get::<SedonaOptions>()
+ else {
+ return sedona_internal_err!("SedonaOptions not found in session
state extensions");
+ };
+
+ if !ext.spatial_join.enable {
+ return sedona_internal_err!("Spatial join is disabled in
SedonaOptions");
+ }
+
+ if logical_inputs.len() != 2 || physical_inputs.len() != 2 {
+ return plan_err!("SpatialJoinPlanNode expects 2 inputs");
+ }
+
+ let join_type = &spatial_node.join_type;
+
+ let (physical_left, physical_right) =
+ (physical_inputs[0].clone(), physical_inputs[1].clone());
+
+ let join_filter = logical_join_filter_to_physical(
+ spatial_node,
+ session_state,
+ &physical_left,
+ &physical_right,
+ )?;
+
+ let Some((spatial_predicate, remainder)) =
transform_join_filter(&join_filter) else {
+ let nlj = NestedLoopJoinExec::try_new(
+ physical_left,
+ physical_right,
+ Some(join_filter),
+ join_type,
+ None,
+ )?;
+ return Ok(Some(Arc::new(nlj)));
+ };
+
+ let args = PlanSpatialJoinArgs {
+ physical_left: &physical_left,
+ physical_right: &physical_right,
+ spatial_predicate: &spatial_predicate,
+ remainder: remainder.as_ref(),
+ join_type,
+ join_options: &ext.spatial_join,
+ options: session_state.config_options(),
+ };
+
+ // Iterate over in reverse to handle more recently added factories
first
+ for factory in self.factories.iter().rev() {
+ if let Some(exec) = factory.plan_spatial_join(&args)? {
+ return Ok(Some(exec));
+ }
+ }
+
+ let nlj = NestedLoopJoinExec::try_new(
+ physical_left,
+ physical_right,
+ Some(join_filter),
+ join_type,
+ None,
+ )?;
+
+ Ok(Some(Arc::new(nlj)))
+ }
+}
+
+/// This function is mostly taken from the match arm for handling
LogicalPlan::Join in
+///
https://github.com/apache/datafusion/blob/51.0.0/datafusion/core/src/physical_planner.rs#L1144-L1245
+fn logical_join_filter_to_physical(
+ plan_node: &SpatialJoinPlanNode,
+ session_state: &SessionState,
+ physical_left: &Arc<dyn ExecutionPlan>,
+ physical_right: &Arc<dyn ExecutionPlan>,
+) -> Result<JoinFilter> {
+ let SpatialJoinPlanNode {
+ left,
+ right,
+ filter,
+ ..
+ } = plan_node;
+
+ let left_df_schema = left.schema();
+ let right_df_schema = right.schema();
+
+ // Extract columns from filter expression and saved in a HashSet
+ let cols = filter.column_refs();
+
+ // Collect left & right field indices, the field indices are sorted in
ascending order
+ let mut left_field_indices = cols
+ .iter()
+ .filter_map(|c| left_df_schema.index_of_column(c).ok())
+ .collect::<Vec<_>>();
+ left_field_indices.sort_unstable();
+
+ let mut right_field_indices = cols
+ .iter()
+ .filter_map(|c| right_df_schema.index_of_column(c).ok())
+ .collect::<Vec<_>>();
+ right_field_indices.sort_unstable();
+
+ // Collect DFFields and Fields required for intermediate schemas
+ let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) =
left_field_indices
+ .clone()
+ .into_iter()
+ .map(|i| {
+ (
+ left_df_schema.qualified_field(i),
+ physical_left.schema().field(i).clone(),
+ )
+ })
+ .chain(right_field_indices.clone().into_iter().map(|i| {
+ (
+ right_df_schema.qualified_field(i),
+ physical_right.schema().field(i).clone(),
+ )
+ }))
+ .unzip();
+ let filter_df_fields = filter_df_fields
+ .into_iter()
+ .map(|(qualifier, field)| (qualifier.cloned(),
Arc::new(field.clone())))
+ .collect::<Vec<_>>();
+
+ let metadata: HashMap<_, _> = left_df_schema
+ .metadata()
+ .clone()
+ .into_iter()
+ .chain(right_df_schema.metadata().clone())
+ .collect();
+
+ // Construct intermediate schemas used for filtering data and
+ // convert logical expression to physical according to filter schema
+ let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields,
metadata.clone())?;
+ let filter_schema = Schema::new_with_metadata(filter_fields, metadata);
+
+ let filter_expr =
+ create_physical_expr(filter, &filter_df_schema,
session_state.execution_props())?;
+ let column_indices = JoinFilter::build_column_indices(left_field_indices,
right_field_indices);
+
+ let join_filter = JoinFilter::new(filter_expr, column_indices,
Arc::new(filter_schema));
+ Ok(join_filter)
+}
diff --git a/rust/sedona-spatial-join/src/spatial_predicate.rs
b/rust/sedona-query-planner/src/spatial_predicate.rs
similarity index 100%
rename from rust/sedona-spatial-join/src/spatial_predicate.rs
rename to rust/sedona-query-planner/src/spatial_predicate.rs
diff --git a/rust/sedona-spatial-join/Cargo.toml
b/rust/sedona-spatial-join/Cargo.toml
index e9d830cf..26b084fa 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -61,6 +61,7 @@ sedona-functions = { workspace = true }
sedona-geo = { workspace = true }
sedona-geometry = { workspace = true }
sedona-schema = { workspace = true }
+sedona-query-planner = { workspace = true }
sedona-tg = { workspace = true }
sedona-geos = { workspace = true }
wkb = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/index.rs
b/rust/sedona-spatial-join/src/index.rs
index cd5f38d2..9c3c00c5 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -27,9 +27,8 @@ pub(crate) mod spatial_index_builder;
pub(crate) use build_side_collector::{
BuildPartition, BuildSideBatchesCollector, CollectBuildSideMetrics,
};
-pub(crate) use spatial_index::SpatialIndex;
-
pub(crate) use default_spatial_index_builder::DefaultSpatialIndexBuilder;
+
use wkb::reader::Wkb;
/// The result of a spatial index query
@@ -40,9 +39,13 @@ pub(crate) struct IndexQueryResult<'a, 'b> {
pub position: (i32, i32),
}
+// Public definitions for extensions that define their own indexing
+pub use spatial_index::SpatialIndex;
+pub use spatial_index_builder::SpatialIndexBuilder;
+
/// The metrics for a spatial index query
#[derive(Debug)]
-pub(crate) struct QueryResultMetrics {
+pub struct QueryResultMetrics {
pub count: usize,
pub candidate_count: usize,
}
diff --git a/rust/sedona-spatial-join/src/index/default_spatial_index.rs
b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
index e1c85e77..45028afc 100644
--- a/rust/sedona-spatial-join/src/index/default_spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
@@ -283,7 +283,6 @@ impl SpatialIndex for DefaultSpatialIndex {
self.inner.schema.clone()
}
- #[cfg(test)]
fn num_indexed_batches(&self) -> usize {
self.inner.indexed_batches.len()
}
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index ccad19bd..ecd72923 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -36,11 +36,10 @@ pub const DISTANCE_TOLERANCE: f64 = 1e-9;
/// as well as methods for managing probe statistics and tracking visited
build side batches.
/// The trait is designed to be implemented by various spatial index structures
#[async_trait]
-pub(crate) trait SpatialIndex {
+pub trait SpatialIndex {
/// Returns the schema of the indexed data.
fn schema(&self) -> SchemaRef;
/// Returns the number of batches that have been indexed.
- #[cfg(test)] // This is used for tests
fn num_indexed_batches(&self) -> usize;
/// Get the batch at the given index.
fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch;
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index c51217ba..af21c01b 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -25,7 +25,7 @@ use datafusion_common::Result;
/// Builder for constructing a SpatialIndex from geometry batches.
#[async_trait]
-pub(crate) trait SpatialIndexBuilder: Send + Sync {
+pub trait SpatialIndexBuilder: Send + Sync {
/// Add a stream to this builder
async fn add_stream(
&mut self,
@@ -39,11 +39,11 @@ pub(crate) trait SpatialIndexBuilder: Send + Sync {
/// Metrics for the build phase of the spatial join.
#[derive(Clone, Debug, Default)]
-pub(crate) struct SpatialJoinBuildMetrics {
+pub struct SpatialJoinBuildMetrics {
/// Total time for collecting build-side of join
- pub(crate) build_time: metrics::Time,
+ pub build_time: metrics::Time,
/// Memory used by the spatial-index in bytes
- pub(crate) build_mem_used: metrics::Gauge,
+ pub build_mem_used: metrics::Gauge,
}
impl SpatialJoinBuildMetrics {
diff --git a/rust/sedona-spatial-join/src/join_provider.rs
b/rust/sedona-spatial-join/src/join_provider.rs
index 595a6d12..4bfc5b13 100644
--- a/rust/sedona-spatial-join/src/join_provider.rs
+++ b/rust/sedona-spatial-join/src/join_provider.rs
@@ -38,7 +38,7 @@ use crate::{
/// details of a spatial join. In particular it allows plugging in a custom
/// index for accelerated joins on specific hardware (e.g., GPU) and a custom
/// bounder for specific data types (e.g., geography).
-pub(crate) trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
+pub trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
/// Create a new [SpatialIndexBuilder]
fn try_new_spatial_index_builder(
&self,
diff --git a/rust/sedona-spatial-join/src/lib.rs
b/rust/sedona-spatial-join/src/lib.rs
index 372efb7f..3301fc09 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -17,26 +17,26 @@
pub mod evaluated_batch;
pub mod exec;
-mod index;
-mod join_provider;
+pub mod index;
+pub mod join_provider;
pub mod operand_evaluator;
pub mod partitioning;
-pub mod planner;
+pub mod physical_planner;
mod prepare;
mod probe;
pub mod refine;
-pub mod spatial_predicate;
+pub use sedona_query_planner::spatial_predicate;
mod stream;
pub mod utils;
pub use exec::SpatialJoinExec;
// Re-export function for register the spatial join planner
-pub use planner::register_planner;
+pub use physical_planner::DefaultSpatialJoinPhysicalPlanner;
// Re-export ProbeShuffleExec so that integration tests (and other crates) can
verify
// its presence in optimized physical plans.
-pub use planner::probe_shuffle_exec::ProbeShuffleExec;
+pub use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
// Re-export types needed for external usage (e.g., in Comet)
pub use spatial_predicate::SpatialPredicate;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index 3c316b4c..859c3465 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -45,7 +45,7 @@ use crate::{
/// support non-Cartesian bounding or non-WKB backed geometry arrays. This
also may
/// expand to support more compact or efficient serialization/deserialization
of the
/// evaluated array when spilling.
-pub(crate) trait EvaluatedGeometryArrayFactory: fmt::Debug + Send + Sync {
+pub trait EvaluatedGeometryArrayFactory: fmt::Debug + Send + Sync {
/// Create a new [EvaluatedGeometryArray]
fn try_new_evaluated_array(
&self,
diff --git a/rust/sedona-spatial-join/src/physical_planner.rs
b/rust/sedona-spatial-join/src/physical_planner.rs
new file mode 100644
index 00000000..37e7ef34
--- /dev/null
+++ b/rust/sedona-spatial-join/src/physical_planner.rs
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{JoinSide, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
+use sedona_query_planner::spatial_join_physical_planner::{
+ PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{DistancePredicate, KNNPredicate,
RelationPredicate};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+
+use crate::exec::SpatialJoinExec;
+use crate::spatial_predicate::SpatialPredicate;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct DefaultSpatialJoinPhysicalPlanner;
+
+impl DefaultSpatialJoinPhysicalPlanner {
+ /// Create a new default join factory
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl Default for DefaultSpatialJoinPhysicalPlanner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SpatialJoinPhysicalPlanner for DefaultSpatialJoinPhysicalPlanner {
+ fn plan_spatial_join(
+ &self,
+ args: &PlanSpatialJoinArgs<'_>,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ if !is_spatial_predicate_supported(
+ args.spatial_predicate,
+ &args.physical_left.schema(),
+ &args.physical_right.schema(),
+ )? {
+ return Ok(None);
+ }
+
+ let should_swap = !matches!(
+ args.spatial_predicate,
+ SpatialPredicate::KNearestNeighbors(_)
+ ) && args.join_type.supports_swap()
+ && should_swap_join_order(
+ args.join_options,
+ args.physical_left.as_ref(),
+ args.physical_right.as_ref(),
+ )?;
+
+ // Repartition the probe side when enabled. This breaks spatial
locality in sorted/skewed
+ // datasets, leading to more balanced workloads during out-of-core
spatial join.
+ // We determine which pre-swap input will be the probe AFTER any
potential swap, and
+ // repartition it here. swap_inputs() will then carry the
RepartitionExec to the correct
+ // child position.
+ let (physical_left, physical_right) = if
args.join_options.repartition_probe_side {
+ repartition_probe_side(
+ args.physical_left.clone(),
+ args.physical_right.clone(),
+ args.spatial_predicate,
+ should_swap,
+ )?
+ } else {
+ (args.physical_left.clone(), args.physical_right.clone())
+ };
+
+ let exec = SpatialJoinExec::try_new(
+ physical_left,
+ physical_right,
+ args.spatial_predicate.clone(),
+ args.remainder.cloned(),
+ args.join_type,
+ None,
+ args.join_options,
+ )?;
+
+ if should_swap {
+ exec.swap_inputs().map(Some)
+ } else {
+ Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
+ }
+ }
+}
+
+/// Spatial join reordering heuristic:
+/// 1. Put the input with fewer rows on the build side, because fewer entries
+/// produce a smaller and more efficient spatial index (R-tree).
+/// 2. If row-count statistics are unavailable (for example, for CSV sources),
+/// fall back to total input size as an estimate.
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+/// statistics are available.
+fn should_swap_join_order(
+ spatial_join_options: &SpatialJoinOptions,
+ left: &dyn ExecutionPlan,
+ right: &dyn ExecutionPlan,
+) -> Result<bool> {
+ if !spatial_join_options.spatial_join_reordering {
+ log::info!(
+ "spatial join swap heuristic disabled via
sedona.spatial_join.spatial_join_reordering"
+ );
+ return Ok(false);
+ }
+
+ let left_stats = left.partition_statistics(None)?;
+ let right_stats = right.partition_statistics(None)?;
+
+ let left_num_rows = left_stats.num_rows;
+ let right_num_rows = right_stats.num_rows;
+ let left_total_byte_size = left_stats.total_byte_size;
+ let right_total_byte_size = right_stats.total_byte_size;
+
+ let should_swap = match (left_num_rows.get_value(),
right_num_rows.get_value()) {
+ (Some(l), Some(r)) => l > r,
+ _ => match (
+ left_total_byte_size.get_value(),
+ right_total_byte_size.get_value(),
+ ) {
+ (Some(l), Some(r)) => l > r,
+ _ => false,
+ },
+ };
+
+ log::info!(
+ "spatial join swap heuristic: left_num_rows={left_num_rows:?},
right_num_rows={right_num_rows:?},
left_total_byte_size={left_total_byte_size:?},
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
+ );
+
+ Ok(should_swap)
+}
+
+/// Repartition the probe side of a spatial join using `RoundRobinBatch`
partitioning.
+///
+/// The purpose is to break spatial locality in sorted or skewed datasets,
which can cause
+/// imbalanced partitions when running out-of-core spatial join. The number of
partitions is
+/// preserved; only the distribution of rows across partitions is shuffled.
+///
+/// The `should_swap` parameter indicates whether `swap_inputs()` will be
called after
+/// `SpatialJoinExec` is constructed. This affects which pre-swap input will
become the
+/// probe side:
+/// - For non-KNN predicates: probe is always `Right` after any swap. If
`should_swap` is true,
+/// the current `left` will become `right` (probe) after swap, so we
repartition `left`.
+/// - For KNN predicates: `should_swap` is always false, and the probe side is
determined by
+/// `KNNPredicate::probe_side`.
+fn repartition_probe_side(
+ mut physical_left: Arc<dyn ExecutionPlan>,
+ mut physical_right: Arc<dyn ExecutionPlan>,
+ spatial_predicate: &SpatialPredicate,
+ should_swap: bool,
+) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
+ let probe_plan = match spatial_predicate {
+ SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
+ JoinSide::Left => &mut physical_left,
+ JoinSide::Right => &mut physical_right,
+ JoinSide::None => {
+ // KNNPredicate::probe_side is asserted not to be None in its
constructor;
+ // treat this as a debug-only invariant violation and default
to right.
+ debug_assert!(false, "KNNPredicate::probe_side must not be
JoinSide::None");
+ &mut physical_right
+ }
+ },
+ _ => {
+ // For Relation/Distance predicates, probe is always Right after
swap.
+ // If should_swap, the current left will be moved to the right
(probe) by swap_inputs().
+ if should_swap {
+ &mut physical_left
+ } else {
+ &mut physical_right
+ }
+ }
+ };
+
+ *probe_plan = Arc::new(ProbeShuffleExec::try_new(Arc::clone(probe_plan))?);
+
+ Ok((physical_left, physical_right))
+}
+
+pub fn is_spatial_predicate_supported(
+ spatial_predicate: &SpatialPredicate,
+ left_schema: &Schema,
+ right_schema: &Schema,
+) -> Result<bool> {
+ /// Only spatial predicates working with planar geometry are supported for
optimization.
+ /// Geography (spherical) types are explicitly excluded and will not
trigger optimized spatial joins.
+ fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema:
&Schema) -> Result<bool> {
+ let left_return_field = expr.return_field(schema)?;
+ let sedona_type = SedonaType::from_storage_field(&left_return_field)?;
+ let matcher = ArgMatcher::is_geometry();
+ Ok(matcher.match_type(&sedona_type))
+ }
+
+ match spatial_predicate {
+ SpatialPredicate::Relation(RelationPredicate { left, right, .. })
+ | SpatialPredicate::Distance(DistancePredicate { left, right, .. }) =>
{
+ Ok(is_geometry_type_supported(left, left_schema)?
+ && is_geometry_type_supported(right, right_schema)?)
+ }
+ SpatialPredicate::KNearestNeighbors(KNNPredicate {
+ left,
+ right,
+ probe_side,
+ ..
+ }) => {
+ let (left, right) = match probe_side {
+ JoinSide::Left => (left, right),
+ JoinSide::Right => (right, left),
+ _ => {
+ return sedona_internal_err!(
+ "Invalid probe side in KNN predicate: {:?}",
+ probe_side
+ )
+ }
+ };
+ Ok(is_geometry_type_supported(left, left_schema)?
+ && is_geometry_type_supported(right, right_schema)?)
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use arrow_schema::{DataType, Field};
+ use datafusion_physical_expr::expressions::Column;
+ use sedona_query_planner::spatial_predicate::SpatialRelationType;
+ use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
+
+ use super::*;
+
+ #[test]
+ fn test_is_spatial_predicate_supported() {
+ // Planar geometry field
+ let geom_field = WKB_GEOMETRY.to_storage_field("geom", false).unwrap();
+ let schema = Arc::new(Schema::new(vec![geom_field.clone()]));
+ let col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn
PhysicalExpr>;
+ let rel_pred = RelationPredicate::new(
+ col_expr.clone(),
+ col_expr.clone(),
+ SpatialRelationType::Intersects,
+ );
+ let spatial_pred = SpatialPredicate::Relation(rel_pred);
+ assert!(is_spatial_predicate_supported(&spatial_pred, &schema,
&schema).unwrap());
+
+ // Geography field (should NOT be supported)
+ let geog_field = WKB_GEOGRAPHY.to_storage_field("geog",
false).unwrap();
+ let geog_schema = Arc::new(Schema::new(vec![geog_field.clone()]));
+ let geog_col_expr = Arc::new(Column::new("geog", 0)) as Arc<dyn
PhysicalExpr>;
+ let rel_pred_geog = RelationPredicate::new(
+ geog_col_expr.clone(),
+ geog_col_expr.clone(),
+ SpatialRelationType::Intersects,
+ );
+ let spatial_pred_geog = SpatialPredicate::Relation(rel_pred_geog);
+ assert!(
+ !is_spatial_predicate_supported(&spatial_pred_geog, &geog_schema,
&geog_schema)
+ .unwrap()
+ );
+ }
+
+ #[test]
+ fn test_is_knn_predicate_supported() {
+ // ST_KNN(left, right)
+ let left_schema = Arc::new(Schema::new(vec![WKB_GEOMETRY
+ .to_storage_field("geom", false)
+ .unwrap()]));
+ let right_schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ WKB_GEOMETRY.to_storage_field("geom", false).unwrap(),
+ ]));
+ let left_col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn
PhysicalExpr>;
+ let right_col_expr = Arc::new(Column::new("geom", 1)) as Arc<dyn
PhysicalExpr>;
+ let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
+ left_col_expr.clone(),
+ right_col_expr.clone(),
+ 5,
+ false,
+ JoinSide::Left,
+ ));
+ assert!(is_spatial_predicate_supported(&knn_pred, &left_schema,
&right_schema).unwrap());
+
+ // ST_KNN(right, left)
+ let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
+ right_col_expr.clone(),
+ left_col_expr.clone(),
+ 5,
+ false,
+ JoinSide::Right,
+ ));
+ assert!(is_spatial_predicate_supported(&knn_pred, &left_schema,
&right_schema).unwrap());
+
+ // ST_KNN with geography (should NOT be supported)
+ let left_geog_schema = Arc::new(Schema::new(vec![WKB_GEOGRAPHY
+ .to_storage_field("geog", false)
+ .unwrap()]));
+ assert!(
+ !is_spatial_predicate_supported(&knn_pred, &left_geog_schema,
&right_schema).unwrap()
+ );
+
+ let right_geog_schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ WKB_GEOGRAPHY.to_storage_field("geog", false).unwrap(),
+ ]));
+ assert!(
+ !is_spatial_predicate_supported(&knn_pred, &left_schema,
&right_geog_schema).unwrap()
+ );
+ }
+}
diff --git a/rust/sedona-spatial-join/src/planner.rs
b/rust/sedona-spatial-join/src/planner.rs
deleted file mode 100644
index b663b293..00000000
--- a/rust/sedona-spatial-join/src/planner.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! DataFusion planner integration for Sedona spatial joins.
-//!
-//! This module wires Sedona's logical optimizer rules and physical planning
extensions that
-//! can produce `SpatialJoinExec`.
-
-use datafusion::execution::SessionStateBuilder;
-use datafusion_common::Result;
-
-mod logical_plan_node;
-mod optimizer;
-mod physical_planner;
-pub mod probe_shuffle_exec;
-mod spatial_expr_utils;
-
-/// Register Sedona spatial join planning hooks.
-///
-/// Enables logical rewrites (to surface join filters) and a query planner
extension that can
-/// plan `SpatialJoinExec`. This is the primary entry point to leveraging the
spatial join
-/// implementation provided by this crate and ensures joins created by SQL or
using
-/// a DataFrame API that meet certain conditions (e.g. contain a spatial
predicate as
-/// a join condition) are executed using the `SpatialJoinExec`.
-pub fn register_planner(state_builder: SessionStateBuilder) ->
Result<SessionStateBuilder> {
- // Enable the logical rewrite that turns Filter(CrossJoin) into
Join(filter=...)
- let state_builder =
optimizer::register_spatial_join_logical_optimizer(state_builder)?;
-
- // Enable planning SpatialJoinExec via an extension node during
logical->physical planning.
- Ok(physical_planner::register_spatial_join_planner(
- state_builder,
- ))
-}
diff --git a/rust/sedona-spatial-join/src/planner/physical_planner.rs
b/rust/sedona-spatial-join/src/planner/physical_planner.rs
deleted file mode 100644
index 0eb7d0d8..00000000
--- a/rust/sedona-spatial-join/src/planner/physical_planner.rs
+++ /dev/null
@@ -1,360 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-
-use arrow_schema::Schema;
-
-use datafusion::execution::context::QueryPlanner;
-use datafusion::execution::session_state::{SessionState, SessionStateBuilder};
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner,
PhysicalPlanner};
-use datafusion_common::{plan_err, DFSchema, JoinSide, Result};
-use datafusion_expr::logical_plan::UserDefinedLogicalNode;
-use datafusion_expr::LogicalPlan;
-use datafusion_physical_expr::create_physical_expr;
-use datafusion_physical_plan::joins::utils::JoinFilter;
-use datafusion_physical_plan::joins::NestedLoopJoinExec;
-use sedona_common::sedona_internal_err;
-
-use crate::exec::SpatialJoinExec;
-use crate::planner::logical_plan_node::SpatialJoinPlanNode;
-use crate::planner::probe_shuffle_exec::ProbeShuffleExec;
-use crate::planner::spatial_expr_utils::{is_spatial_predicate_supported,
transform_join_filter};
-use crate::spatial_predicate::SpatialPredicate;
-use sedona_common::option::{SedonaOptions, SpatialJoinOptions};
-
-/// Registers a query planner that can produce [`SpatialJoinExec`] from a
logical extension node.
-pub(crate) fn register_spatial_join_planner(builder: SessionStateBuilder) ->
SessionStateBuilder {
- let extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
- vec![Arc::new(SpatialJoinExtensionPlanner {})];
- builder.with_query_planner(Arc::new(SedonaSpatialQueryPlanner {
extension_planners }))
-}
-
-/// Query planner that enables Sedona's spatial join planning.
-///
-/// Installs an [`ExtensionPlanner`] that recognizes `SpatialJoinPlanNode` and
produces
-/// `SpatialJoinExec` when supported and enabled.
-struct SedonaSpatialQueryPlanner {
- extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
-}
-
-impl fmt::Debug for SedonaSpatialQueryPlanner {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("SedonaSpatialQueryPlanner").finish()
- }
-}
-
-#[async_trait]
-impl QueryPlanner for SedonaSpatialQueryPlanner {
- async fn create_physical_plan(
- &self,
- logical_plan: &LogicalPlan,
- session_state: &SessionState,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let physical_planner =
-
DefaultPhysicalPlanner::with_extension_planners(self.extension_planners.clone());
- physical_planner
- .create_physical_plan(logical_plan, session_state)
- .await
- }
-}
-
-/// Physical planner hook for `SpatialJoinPlanNode`.
-struct SpatialJoinExtensionPlanner;
-
-#[async_trait]
-impl ExtensionPlanner for SpatialJoinExtensionPlanner {
- async fn plan_extension(
- &self,
- _planner: &dyn PhysicalPlanner,
- node: &dyn UserDefinedLogicalNode,
- logical_inputs: &[&LogicalPlan],
- physical_inputs: &[Arc<dyn ExecutionPlan>],
- session_state: &SessionState,
- ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- let Some(spatial_node) =
node.as_any().downcast_ref::<SpatialJoinPlanNode>() else {
- return Ok(None);
- };
-
- let Some(ext) = session_state
- .config_options()
- .extensions
- .get::<SedonaOptions>()
- else {
- return sedona_internal_err!("SedonaOptions not found in session
state extensions");
- };
- let spatial_join_options = &ext.spatial_join;
-
- if !ext.spatial_join.enable {
- return sedona_internal_err!("Spatial join is disabled in
SedonaOptions");
- }
-
- if logical_inputs.len() != 2 || physical_inputs.len() != 2 {
- return plan_err!("SpatialJoinPlanNode expects 2 inputs");
- }
-
- let join_type = &spatial_node.join_type;
-
- let (physical_left, physical_right) =
- (physical_inputs[0].clone(), physical_inputs[1].clone());
-
- let join_filter = logical_join_filter_to_physical(
- spatial_node,
- session_state,
- &physical_left,
- &physical_right,
- )?;
-
- let Some((spatial_predicate, remainder)) =
transform_join_filter(&join_filter) else {
- let nlj = NestedLoopJoinExec::try_new(
- physical_left,
- physical_right,
- Some(join_filter),
- join_type,
- None,
- )?;
- return Ok(Some(Arc::new(nlj)));
- };
-
- if !is_spatial_predicate_supported(
- &spatial_predicate,
- &physical_left.schema(),
- &physical_right.schema(),
- )? {
- let nlj = NestedLoopJoinExec::try_new(
- physical_left,
- physical_right,
- Some(join_filter),
- join_type,
- None,
- )?;
- return Ok(Some(Arc::new(nlj)));
- }
-
- let should_swap = !matches!(spatial_predicate,
SpatialPredicate::KNearestNeighbors(_))
- && join_type.supports_swap()
- && should_swap_join_order(
- spatial_join_options,
- physical_left.as_ref(),
- physical_right.as_ref(),
- )?;
-
- // Repartition the probe side when enabled. This breaks spatial
locality in sorted/skewed
- // datasets, leading to more balanced workloads during out-of-core
spatial join.
- // We determine which pre-swap input will be the probe AFTER any
potential swap, and
- // repartition it here. swap_inputs() will then carry the
RepartitionExec to the correct
- // child position.
- let (physical_left, physical_right) = if
spatial_join_options.repartition_probe_side {
- repartition_probe_side(
- physical_left,
- physical_right,
- &spatial_predicate,
- should_swap,
- )?
- } else {
- (physical_left, physical_right)
- };
-
- let exec = SpatialJoinExec::try_new(
- physical_left,
- physical_right,
- spatial_predicate,
- remainder,
- join_type,
- None,
- spatial_join_options,
- )?;
-
- if should_swap {
- exec.swap_inputs().map(Some)
- } else {
- Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
- }
- }
-}
-
-/// Spatial join reordering heuristic:
-/// 1. Put the input with fewer rows on the build side, because fewer entries
-/// produce a smaller and more efficient spatial index (R-tree).
-/// 2. If row-count statistics are unavailable (for example, for CSV sources),
-/// fall back to total input size as an estimate.
-/// 3. Do not swap the join order if join reordering is disabled or no relevant
-/// statistics are available.
-fn should_swap_join_order(
- spatial_join_options: &SpatialJoinOptions,
- left: &dyn ExecutionPlan,
- right: &dyn ExecutionPlan,
-) -> Result<bool> {
- if !spatial_join_options.spatial_join_reordering {
- log::info!(
- "spatial join swap heuristic disabled via
sedona.spatial_join.spatial_join_reordering"
- );
- return Ok(false);
- }
-
- let left_stats = left.partition_statistics(None)?;
- let right_stats = right.partition_statistics(None)?;
-
- let left_num_rows = left_stats.num_rows;
- let right_num_rows = right_stats.num_rows;
- let left_total_byte_size = left_stats.total_byte_size;
- let right_total_byte_size = right_stats.total_byte_size;
-
- let should_swap = match (left_num_rows.get_value(),
right_num_rows.get_value()) {
- (Some(l), Some(r)) => l > r,
- _ => match (
- left_total_byte_size.get_value(),
- right_total_byte_size.get_value(),
- ) {
- (Some(l), Some(r)) => l > r,
- _ => false,
- },
- };
-
- log::info!(
- "spatial join swap heuristic: left_num_rows={left_num_rows:?},
right_num_rows={right_num_rows:?},
left_total_byte_size={left_total_byte_size:?},
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
- );
-
- Ok(should_swap)
-}
-
-/// This function is mostly taken from the match arm for handling
LogicalPlan::Join in
-///
https://github.com/apache/datafusion/blob/51.0.0/datafusion/core/src/physical_planner.rs#L1144-L1245
-fn logical_join_filter_to_physical(
- plan_node: &SpatialJoinPlanNode,
- session_state: &SessionState,
- physical_left: &Arc<dyn ExecutionPlan>,
- physical_right: &Arc<dyn ExecutionPlan>,
-) -> Result<JoinFilter> {
- let SpatialJoinPlanNode {
- left,
- right,
- filter,
- ..
- } = plan_node;
-
- let left_df_schema = left.schema();
- let right_df_schema = right.schema();
-
- // Extract columns from filter expression and saved in a HashSet
- let cols = filter.column_refs();
-
- // Collect left & right field indices, the field indices are sorted in
ascending order
- let mut left_field_indices = cols
- .iter()
- .filter_map(|c| left_df_schema.index_of_column(c).ok())
- .collect::<Vec<_>>();
- left_field_indices.sort_unstable();
-
- let mut right_field_indices = cols
- .iter()
- .filter_map(|c| right_df_schema.index_of_column(c).ok())
- .collect::<Vec<_>>();
- right_field_indices.sort_unstable();
-
- // Collect DFFields and Fields required for intermediate schemas
- let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) =
left_field_indices
- .clone()
- .into_iter()
- .map(|i| {
- (
- left_df_schema.qualified_field(i),
- physical_left.schema().field(i).clone(),
- )
- })
- .chain(right_field_indices.clone().into_iter().map(|i| {
- (
- right_df_schema.qualified_field(i),
- physical_right.schema().field(i).clone(),
- )
- }))
- .unzip();
- let filter_df_fields = filter_df_fields
- .into_iter()
- .map(|(qualifier, field)| (qualifier.cloned(),
Arc::new(field.clone())))
- .collect::<Vec<_>>();
-
- let metadata: HashMap<_, _> = left_df_schema
- .metadata()
- .clone()
- .into_iter()
- .chain(right_df_schema.metadata().clone())
- .collect();
-
- // Construct intermediate schemas used for filtering data and
- // convert logical expression to physical according to filter schema
- let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields,
metadata.clone())?;
- let filter_schema = Schema::new_with_metadata(filter_fields, metadata);
-
- let filter_expr =
- create_physical_expr(filter, &filter_df_schema,
session_state.execution_props())?;
- let column_indices = JoinFilter::build_column_indices(left_field_indices,
right_field_indices);
-
- let join_filter = JoinFilter::new(filter_expr, column_indices,
Arc::new(filter_schema));
- Ok(join_filter)
-}
-
-/// Repartition the probe side of a spatial join using `RoundRobinBatch`
partitioning.
-///
-/// The purpose is to break spatial locality in sorted or skewed datasets,
which can cause
-/// imbalanced partitions when running out-of-core spatial join. The number of
partitions is
-/// preserved; only the distribution of rows across partitions is shuffled.
-///
-/// The `should_swap` parameter indicates whether `swap_inputs()` will be
called after
-/// `SpatialJoinExec` is constructed. This affects which pre-swap input will
become the
-/// probe side:
-/// - For non-KNN predicates: probe is always `Right` after any swap. If
`should_swap` is true,
-/// the current `left` will become `right` (probe) after swap, so we
repartition `left`.
-/// - For KNN predicates: `should_swap` is always false, and the probe side is
determined by
-/// `KNNPredicate::probe_side`.
-fn repartition_probe_side(
- mut physical_left: Arc<dyn ExecutionPlan>,
- mut physical_right: Arc<dyn ExecutionPlan>,
- spatial_predicate: &SpatialPredicate,
- should_swap: bool,
-) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
- let probe_plan = match spatial_predicate {
- SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
- JoinSide::Left => &mut physical_left,
- JoinSide::Right => &mut physical_right,
- JoinSide::None => {
- // KNNPredicate::probe_side is asserted not to be None in its
constructor;
- // treat this as a debug-only invariant violation and default
to right.
- debug_assert!(false, "KNNPredicate::probe_side must not be
JoinSide::None");
- &mut physical_right
- }
- },
- _ => {
- // For Relation/Distance predicates, probe is always Right after
swap.
- // If should_swap, the current left will be moved to the right
(probe) by swap_inputs().
- if should_swap {
- &mut physical_left
- } else {
- &mut physical_right
- }
- }
- };
-
- *probe_plan = Arc::new(ProbeShuffleExec::try_new(Arc::clone(probe_plan))?);
-
- Ok((physical_left, physical_right))
-}
diff --git a/rust/sedona-spatial-join/tests/spatial_join_integration.rs
b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
index 60f0033b..8cfda7f3 100644
--- a/rust/sedona-spatial-join/tests/spatial_join_integration.rs
+++ b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
@@ -43,13 +43,16 @@ use sedona_common::SedonaOptions;
use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
use sedona_geo::to_geo::GeoTypesExecutor;
use sedona_geometry::types::GeometryTypeId;
+use sedona_query_planner::{
+ optimizer::register_spatial_join_logical_optimizer,
query_planner::SedonaQueryPlanner,
+};
use sedona_schema::{
datatypes::{SedonaType, WKB_GEOGRAPHY, WKB_GEOMETRY},
matchers::ArgMatcher,
};
use sedona_spatial_join::{
- register_planner, spatial_predicate::RelationPredicate, ProbeShuffleExec,
SpatialJoinExec,
- SpatialPredicate,
+ spatial_predicate::RelationPredicate, DefaultSpatialJoinPhysicalPlanner,
ProbeShuffleExec,
+ SpatialJoinExec, SpatialPredicate,
};
use sedona_testing::datagen::RandomPartitionedDataBuilder;
use tokio::sync::OnceCell;
@@ -154,7 +157,12 @@ fn setup_context(options: Option<SpatialJoinOptions>,
batch_size: usize) -> Resu
session_config = add_sedona_option_extension(session_config);
let mut state_builder = SessionStateBuilder::new();
if let Some(options) = options {
- state_builder = register_planner(state_builder)?;
+ state_builder =
register_spatial_join_logical_optimizer(state_builder)?;
+ state_builder = state_builder.with_query_planner(Arc::new(
+
SedonaQueryPlanner::new().with_spatial_join_physical_planner(Arc::new(
+ DefaultSpatialJoinPhysicalPlanner::new(),
+ )),
+ ));
let opts = session_config
.options_mut()
.extensions
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index 051c7841..b0787420 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -82,6 +82,7 @@ sedona-gdal = { workspace = true }
sedona-raster-functions = { workspace = true }
sedona-schema = { workspace = true }
sedona-spatial-join = { workspace = true, optional = true }
+sedona-query-planner = { workspace = true }
sedona-s2geography = { workspace = true, optional = true }
sedona-testing = { workspace = true }
sedona-tg = { workspace = true, optional = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index ecf5362b..b85e3be8 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -64,6 +64,9 @@ use sedona_pointcloud::las::{
format::{Extension, LasFormatFactory},
options::{GeometryEncoding, LasExtraBytes, LasOptions},
};
+use sedona_query_planner::{
+ optimizer::register_spatial_join_logical_optimizer,
query_planner::SedonaQueryPlanner,
+};
/// Sedona SessionContext wrapper
///
@@ -143,11 +146,19 @@ impl SedonaContext {
.with_config(session_config);
// Register the spatial join planner extension
+ let mut planner = SedonaQueryPlanner::new();
#[cfg(feature = "spatial-join")]
{
- state_builder =
sedona_spatial_join::register_planner(state_builder)?;
+ use
sedona_spatial_join::physical_planner::DefaultSpatialJoinPhysicalPlanner;
+
+ planner = planner.with_spatial_join_physical_planner(Arc::new(
+ DefaultSpatialJoinPhysicalPlanner::new(),
+ ));
}
+ state_builder =
register_spatial_join_logical_optimizer(state_builder)?;
+ state_builder = state_builder.with_query_planner(Arc::new(planner));
+
let mut state = state_builder.build();
state.register_file_format(Arc::new(GeoParquetFormatFactory::new()),
true)?;
#[cfg(feature = "pointcloud")]