This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 40e1460d refactor(rust/sedona-query-planner): Move query planner and 
utilities to dedicated crate (#735)
40e1460d is described below

commit 40e1460d15ea64d359b66fa7c070921f24846476
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Apr 7 11:01:54 2026 -0500

    refactor(rust/sedona-query-planner): Move query planner and utilities to 
dedicated crate (#735)
---
 Cargo.lock                                         |  20 ++
 Cargo.toml                                         |   2 +
 rust/sedona-query-planner/Cargo.toml               |  45 +++
 .../src/lib.rs                                     |  33 +-
 .../src}/logical_plan_node.rs                      |   2 +-
 .../src}/optimizer.rs                              |   6 +-
 .../src}/probe_shuffle_exec.rs                     |   0
 rust/sedona-query-planner/src/query_planner.rs     |  93 ++++++
 .../src}/spatial_expr_utils.rs                     | 130 +-------
 .../src/spatial_join_physical_planner.rs           | 252 +++++++++++++++
 .../src/spatial_predicate.rs                       |   0
 rust/sedona-spatial-join/Cargo.toml                |   1 +
 rust/sedona-spatial-join/src/index.rs              |   9 +-
 .../src/index/default_spatial_index.rs             |   1 -
 .../sedona-spatial-join/src/index/spatial_index.rs |   3 +-
 .../src/index/spatial_index_builder.rs             |   8 +-
 rust/sedona-spatial-join/src/join_provider.rs      |   2 +-
 rust/sedona-spatial-join/src/lib.rs                |  12 +-
 rust/sedona-spatial-join/src/operand_evaluator.rs  |   2 +-
 rust/sedona-spatial-join/src/physical_planner.rs   | 332 +++++++++++++++++++
 rust/sedona-spatial-join/src/planner.rs            |  47 ---
 .../src/planner/physical_planner.rs                | 360 ---------------------
 .../tests/spatial_join_integration.rs              |  14 +-
 rust/sedona/Cargo.toml                             |   1 +
 rust/sedona/src/context.rs                         |  13 +-
 25 files changed, 801 insertions(+), 587 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 3bafcc4f..567c1a10 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5201,6 +5201,7 @@ dependencies = [
  "sedona-geos",
  "sedona-pointcloud",
  "sedona-proj",
+ "sedona-query-planner",
  "sedona-raster-functions",
  "sedona-s2geography",
  "sedona-schema",
@@ -5602,6 +5603,24 @@ dependencies = [
  "wkb",
 ]
 
+[[package]]
+name = "sedona-query-planner"
+version = "0.4.0"
+dependencies = [
+ "arrow",
+ "arrow-schema",
+ "async-trait",
+ "datafusion",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-schema",
+]
+
 [[package]]
 name = "sedona-raster"
 version = "0.4.0"
@@ -5715,6 +5734,7 @@ dependencies = [
  "sedona-geo-traits-ext",
  "sedona-geometry",
  "sedona-geos",
+ "sedona-query-planner",
  "sedona-schema",
  "sedona-testing",
  "sedona-tg",
diff --git a/Cargo.toml b/Cargo.toml
index 9237305c..478d2ae8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ members = [
     "rust/sedona-raster-functions",
     "rust/sedona-schema",
     "rust/sedona-spatial-join",
+    "rust/sedona-query-planner",
     "rust/sedona-testing",
     "rust/sedona",
     "sedona-cli",
@@ -146,6 +147,7 @@ sedona-raster = { version = "0.4.0", path = 
"rust/sedona-raster" }
 sedona-raster-functions = { version = "0.4.0", path = 
"rust/sedona-raster-functions" }
 sedona-schema = { version = "0.4.0", path = "rust/sedona-schema" }
 sedona-spatial-join = { version = "0.4.0", path = "rust/sedona-spatial-join" }
+sedona-query-planner = { version = "0.4.0", path = "rust/sedona-query-planner" 
}
 sedona-testing = { version = "0.4.0", path = "rust/sedona-testing" }
 
 # C wrapper crates
diff --git a/rust/sedona-query-planner/Cargo.toml 
b/rust/sedona-query-planner/Cargo.toml
new file mode 100644
index 00000000..95ace915
--- /dev/null
+++ b/rust/sedona-query-planner/Cargo.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-query-planner"
+version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description = "Query planner and common traits for spatial join operations in 
Apache SedonaDB"
+edition.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+sedona-common = { workspace = true }
+sedona-expr = { workspace = true }
+sedona-schema = { workspace = true }
+
+[dev-dependencies]
+arrow = { workspace = true }
+datafusion = { workspace = true }
+sedona-schema = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/lib.rs 
b/rust/sedona-query-planner/src/lib.rs
similarity index 53%
copy from rust/sedona-spatial-join/src/lib.rs
copy to rust/sedona-query-planner/src/lib.rs
index 372efb7f..4bec6d2b 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-query-planner/src/lib.rs
@@ -15,31 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub mod evaluated_batch;
-pub mod exec;
-mod index;
-mod join_provider;
-pub mod operand_evaluator;
-pub mod partitioning;
-pub mod planner;
-mod prepare;
-mod probe;
-pub mod refine;
+mod logical_plan_node;
+pub mod optimizer;
+pub mod probe_shuffle_exec;
+pub mod query_planner;
+mod spatial_expr_utils;
+pub mod spatial_join_physical_planner;
 pub mod spatial_predicate;
-mod stream;
-pub mod utils;
-
-pub use exec::SpatialJoinExec;
-
-// Re-export function for register the spatial join planner
-pub use planner::register_planner;
-
-// Re-export ProbeShuffleExec so that integration tests (and other crates) can 
verify
-// its presence in optimized physical plans.
-pub use planner::probe_shuffle_exec::ProbeShuffleExec;
-
-// Re-export types needed for external usage (e.g., in Comet)
-pub use spatial_predicate::SpatialPredicate;
-
-// Re-export option types from sedona-common for convenience
-pub use sedona_common::option::*;
diff --git a/rust/sedona-spatial-join/src/planner/logical_plan_node.rs 
b/rust/sedona-query-planner/src/logical_plan_node.rs
similarity index 99%
rename from rust/sedona-spatial-join/src/planner/logical_plan_node.rs
rename to rust/sedona-query-planner/src/logical_plan_node.rs
index 23f35d09..7a9bf952 100644
--- a/rust/sedona-spatial-join/src/planner/logical_plan_node.rs
+++ b/rust/sedona-query-planner/src/logical_plan_node.rs
@@ -29,7 +29,7 @@ use sedona_common::sedona_internal_err;
 /// Carries a join's inputs and filter expression so the physical planner can 
recognize and plan
 /// a `SpatialJoinExec`.
 #[derive(PartialEq, Eq, Hash)]
-pub(crate) struct SpatialJoinPlanNode {
+pub struct SpatialJoinPlanNode {
     pub left: LogicalPlan,
     pub right: LogicalPlan,
     pub join_type: JoinType,
diff --git a/rust/sedona-spatial-join/src/planner/optimizer.rs 
b/rust/sedona-query-planner/src/optimizer.rs
similarity index 99%
rename from rust/sedona-spatial-join/src/planner/optimizer.rs
rename to rust/sedona-query-planner/src/optimizer.rs
index bacaf7a9..7a669038 100644
--- a/rust/sedona-spatial-join/src/planner/optimizer.rs
+++ b/rust/sedona-query-planner/src/optimizer.rs
@@ -16,8 +16,8 @@
 // under the License.
 use std::sync::Arc;
 
-use crate::planner::logical_plan_node::SpatialJoinPlanNode;
-use crate::planner::spatial_expr_utils::{
+use crate::logical_plan_node::SpatialJoinPlanNode;
+use crate::spatial_expr_utils::{
     collect_spatial_predicate_names, find_knn_query_side, KNNJoinQuerySide,
 };
 use datafusion::execution::session_state::SessionStateBuilder;
@@ -52,7 +52,7 @@ use sedona_common::{sedona_internal_datafusion_err, 
sedona_internal_err};
 ///
 /// - `SpatialJoinLogicalRewrite` is appended at the end so that non-KNN 
spatial joins still
 ///   benefit from filter pushdown before being converted to extension nodes.
-pub(crate) fn register_spatial_join_logical_optimizer(
+pub fn register_spatial_join_logical_optimizer(
     mut session_state_builder: SessionStateBuilder,
 ) -> Result<SessionStateBuilder> {
     let optimizer = session_state_builder
diff --git a/rust/sedona-spatial-join/src/planner/probe_shuffle_exec.rs 
b/rust/sedona-query-planner/src/probe_shuffle_exec.rs
similarity index 100%
rename from rust/sedona-spatial-join/src/planner/probe_shuffle_exec.rs
rename to rust/sedona-query-planner/src/probe_shuffle_exec.rs
diff --git a/rust/sedona-query-planner/src/query_planner.rs 
b/rust/sedona-query-planner/src/query_planner.rs
new file mode 100644
index 00000000..8fd2551d
--- /dev/null
+++ b/rust/sedona-query-planner/src/query_planner.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Query planner that delegates to DataFusion's [`DefaultPhysicalPlanner`]
+//! with a configurable set of [`ExtensionPlanner`]s.
+
+use std::fmt;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::execution::context::QueryPlanner;
+use datafusion::execution::session_state::SessionState;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner};
+use datafusion_common::Result;
+use datafusion_expr::LogicalPlan;
+
+use crate::spatial_join_physical_planner::{
+    SpatialJoinExtensionPlanner, SpatialJoinPhysicalPlanner,
+};
+
+/// Query planner that wraps DataFusion's [`DefaultPhysicalPlanner`] with a set
+/// of extension planners that handle custom logical nodes (e.g. spatial 
joins).
+pub struct SedonaQueryPlanner {
+    spatial_join_planner: SpatialJoinExtensionPlanner,
+}
+
+impl SedonaQueryPlanner {
+    /// Create a new [`SedonaQueryPlanner`] with the given extension planners.
+    pub fn new() -> Self {
+        Self {
+            spatial_join_planner: SpatialJoinExtensionPlanner::new(vec![]),
+        }
+    }
+
+    /// Append a [SpatialJoinFactory] to the planner
+    ///
+    /// Note that [crate::optimizer::register_spatial_join_logical_optimizer] 
is required
+    /// to ensure a SpatialJoinExec exists in a logical plan.
+    pub fn with_spatial_join_physical_planner(
+        mut self,
+        factory: Arc<dyn SpatialJoinPhysicalPlanner>,
+    ) -> Self {
+        self.spatial_join_planner
+            .append_spatial_join_physical_planner(factory);
+        self
+    }
+
+    fn extension_planners(&self) -> Vec<Arc<dyn ExtensionPlanner + Send + 
Sync>> {
+        vec![Arc::new(self.spatial_join_planner.clone())]
+    }
+}
+
+impl Default for SedonaQueryPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl fmt::Debug for SedonaQueryPlanner {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SedonaQueryPlanner").finish()
+    }
+}
+
+#[async_trait]
+impl QueryPlanner for SedonaQueryPlanner {
+    async fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let physical_planner =
+            
DefaultPhysicalPlanner::with_extension_planners(self.extension_planners());
+        physical_planner
+            .create_physical_plan(logical_plan, session_state)
+            .await
+    }
+}
diff --git a/rust/sedona-spatial-join/src/planner/spatial_expr_utils.rs 
b/rust/sedona-query-planner/src/spatial_expr_utils.rs
similarity index 95%
rename from rust/sedona-spatial-join/src/planner/spatial_expr_utils.rs
rename to rust/sedona-query-planner/src/spatial_expr_utils.rs
index 700b51bc..2c1a18d2 100644
--- a/rust/sedona-spatial-join/src/planner/spatial_expr_utils.rs
+++ b/rust/sedona-query-planner/src/spatial_expr_utils.rs
@@ -21,23 +21,19 @@ use std::sync::Arc;
 use crate::spatial_predicate::{
     DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate, 
SpatialRelationType,
 };
-use arrow_schema::Schema;
 use datafusion_common::ScalarValue;
 use datafusion_common::{
     tree_node::{Transformed, TreeNode},
     Column as LogicalColumn, JoinSide,
 };
-use datafusion_common::{DFSchema, HashMap, Result};
+use datafusion_common::{DFSchema, HashMap};
 use datafusion_expr::expr::ScalarFunction;
 use datafusion_expr::{Expr, Operator};
 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
 use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
 use datafusion_physical_plan::joins::utils::ColumnIndex;
 use datafusion_physical_plan::joins::utils::JoinFilter;
-use sedona_common::sedona_internal_err;
 use sedona_expr::utils::{parse_distance_predicate, ParsedDistancePredicate};
-use sedona_schema::datatypes::SedonaType;
-use sedona_schema::matchers::ArgMatcher;
 
 /// Collect the names of spatial predicates appeared in expr. We assume that 
the given
 /// `expr` evaluates to a boolean value and originates from a filter logical 
node.
@@ -567,48 +563,6 @@ fn replace_join_filter_expr(expr: &Arc<dyn PhysicalExpr>, 
join_filter: &JoinFilt
     )
 }
 
-pub(crate) fn is_spatial_predicate_supported(
-    spatial_predicate: &SpatialPredicate,
-    left_schema: &Schema,
-    right_schema: &Schema,
-) -> Result<bool> {
-    /// Only spatial predicates working with planar geometry are supported for 
optimization.
-    /// Geography (spherical) types are explicitly excluded and will not 
trigger optimized spatial joins.
-    fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema: 
&Schema) -> Result<bool> {
-        let left_return_field = expr.return_field(schema)?;
-        let sedona_type = SedonaType::from_storage_field(&left_return_field)?;
-        let matcher = ArgMatcher::is_geometry();
-        Ok(matcher.match_type(&sedona_type))
-    }
-
-    match spatial_predicate {
-        SpatialPredicate::Relation(RelationPredicate { left, right, .. })
-        | SpatialPredicate::Distance(DistancePredicate { left, right, .. }) => 
{
-            Ok(is_geometry_type_supported(left, left_schema)?
-                && is_geometry_type_supported(right, right_schema)?)
-        }
-        SpatialPredicate::KNearestNeighbors(KNNPredicate {
-            left,
-            right,
-            probe_side,
-            ..
-        }) => {
-            let (left, right) = match probe_side {
-                JoinSide::Left => (left, right),
-                JoinSide::Right => (right, left),
-                _ => {
-                    return sedona_internal_err!(
-                        "Invalid probe side in KNN predicate: {:?}",
-                        probe_side
-                    )
-                }
-            };
-            Ok(is_geometry_type_supported(left, left_schema)?
-                && is_geometry_type_supported(right, right_schema)?)
-        }
-    }
-}
-
 /// Which side of the join is the query (probe) side for KNN.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub(crate) enum KNNJoinQuerySide {
@@ -652,7 +606,7 @@ pub(crate) fn find_knn_query_side(
 /// Recursively find the `ST_KNN` scalar function call in an expression tree.
 ///
 /// Searches through `AND` binary expressions to find the `ST_KNN` function 
call.
-pub(crate) fn find_st_knn_call(expr: &Expr) -> Option<&ScalarFunction> {
+fn find_st_knn_call(expr: &Expr) -> Option<&ScalarFunction> {
     match expr {
         Expr::ScalarFunction(func) if 
func.func.name().eq_ignore_ascii_case("st_knn") => Some(func),
         Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr { left, right, op })
@@ -677,7 +631,7 @@ mod tests {
     use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
     use datafusion_physical_plan::joins::utils::ColumnIndex;
     use datafusion_physical_plan::joins::utils::JoinFilter;
-    use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
+    use sedona_schema::datatypes::WKB_GEOMETRY;
     use std::sync::Arc;
 
     // Helper function to create a test schema
@@ -2271,84 +2225,6 @@ mod tests {
         assert!(result.is_none()); // Should fail - k must be a literal value
     }
 
-    #[test]
-    fn test_is_spatial_predicate_supported() {
-        // Planar geometry field
-        let geom_field = WKB_GEOMETRY.to_storage_field("geom", false).unwrap();
-        let schema = Arc::new(Schema::new(vec![geom_field.clone()]));
-        let col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn 
PhysicalExpr>;
-        let rel_pred = RelationPredicate::new(
-            col_expr.clone(),
-            col_expr.clone(),
-            SpatialRelationType::Intersects,
-        );
-        let spatial_pred = SpatialPredicate::Relation(rel_pred);
-        assert!(is_spatial_predicate_supported(&spatial_pred, &schema, 
&schema).unwrap());
-
-        // Geography field (should NOT be supported)
-        let geog_field = WKB_GEOGRAPHY.to_storage_field("geog", 
false).unwrap();
-        let geog_schema = Arc::new(Schema::new(vec![geog_field.clone()]));
-        let geog_col_expr = Arc::new(Column::new("geog", 0)) as Arc<dyn 
PhysicalExpr>;
-        let rel_pred_geog = RelationPredicate::new(
-            geog_col_expr.clone(),
-            geog_col_expr.clone(),
-            SpatialRelationType::Intersects,
-        );
-        let spatial_pred_geog = SpatialPredicate::Relation(rel_pred_geog);
-        assert!(
-            !is_spatial_predicate_supported(&spatial_pred_geog, &geog_schema, 
&geog_schema)
-                .unwrap()
-        );
-    }
-
-    #[test]
-    fn test_is_knn_predicate_supported() {
-        // ST_KNN(left, right)
-        let left_schema = Arc::new(Schema::new(vec![WKB_GEOMETRY
-            .to_storage_field("geom", false)
-            .unwrap()]));
-        let right_schema = Arc::new(Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            WKB_GEOMETRY.to_storage_field("geom", false).unwrap(),
-        ]));
-        let left_col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn 
PhysicalExpr>;
-        let right_col_expr = Arc::new(Column::new("geom", 1)) as Arc<dyn 
PhysicalExpr>;
-        let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
-            left_col_expr.clone(),
-            right_col_expr.clone(),
-            5,
-            false,
-            JoinSide::Left,
-        ));
-        assert!(is_spatial_predicate_supported(&knn_pred, &left_schema, 
&right_schema).unwrap());
-
-        // ST_KNN(right, left)
-        let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
-            right_col_expr.clone(),
-            left_col_expr.clone(),
-            5,
-            false,
-            JoinSide::Right,
-        ));
-        assert!(is_spatial_predicate_supported(&knn_pred, &left_schema, 
&right_schema).unwrap());
-
-        // ST_KNN with geography (should NOT be supported)
-        let left_geog_schema = Arc::new(Schema::new(vec![WKB_GEOGRAPHY
-            .to_storage_field("geog", false)
-            .unwrap()]));
-        assert!(
-            !is_spatial_predicate_supported(&knn_pred, &left_geog_schema, 
&right_schema).unwrap()
-        );
-
-        let right_geog_schema = Arc::new(Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            WKB_GEOGRAPHY.to_storage_field("geog", false).unwrap(),
-        ]));
-        assert!(
-            !is_spatial_predicate_supported(&knn_pred, &left_schema, 
&right_geog_schema).unwrap()
-        );
-    }
-
     #[test]
     fn test_collect_spatial_predicate_names() {
         // ST_Intersects should be collected
diff --git a/rust/sedona-query-planner/src/spatial_join_physical_planner.rs 
b/rust/sedona-query-planner/src/spatial_join_physical_planner.rs
new file mode 100644
index 00000000..db37c3f1
--- /dev/null
+++ b/rust/sedona-query-planner/src/spatial_join_physical_planner.rs
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Extension planner for spatial join plan nodes.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use arrow_schema::Schema;
+
+use datafusion::config::ConfigOptions;
+use datafusion::execution::session_state::SessionState;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
+use datafusion_common::{plan_err, DFSchema, Result};
+use datafusion_expr::logical_plan::UserDefinedLogicalNode;
+use datafusion_expr::{JoinType, LogicalPlan};
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_plan::joins::utils::JoinFilter;
+use datafusion_physical_plan::joins::NestedLoopJoinExec;
+use sedona_common::option::SedonaOptions;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+
+use crate::logical_plan_node::SpatialJoinPlanNode;
+use crate::spatial_expr_utils::transform_join_filter;
+use crate::spatial_predicate::SpatialPredicate;
+
+/// Arguments passed to a [`SpatialJoinPhysicalPlanner`] when planning a 
spatial join.
+pub struct PlanSpatialJoinArgs<'a> {
+    pub physical_left: &'a Arc<dyn ExecutionPlan>,
+    pub physical_right: &'a Arc<dyn ExecutionPlan>,
+    pub spatial_predicate: &'a SpatialPredicate,
+    pub remainder: Option<&'a JoinFilter>,
+    pub join_type: &'a JoinType,
+    pub join_options: &'a SpatialJoinOptions,
+    pub options: &'a Arc<ConfigOptions>,
+}
+
+/// Factory trait for creating spatial join physical plans.
+///
+/// Implementations decide whether they can handle a given spatial predicate 
and,
+/// if so, produce an appropriate [`ExecutionPlan`].
+pub trait SpatialJoinPhysicalPlanner: std::fmt::Debug + Send + Sync {
+    /// Given the provided arguments, produce an [ExecutionPlan] implementing
+    /// the join operation or `None` if this implementation cannot execute the 
join
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
+}
+
+/// Physical planner hook for [`SpatialJoinPlanNode`].
+///
+/// Delegates to a list of [`SpatialJoinFactory`] implementations to
+/// produce a physical plan for spatial join nodes. Falls back to
+/// [`NestedLoopJoinExec`] when no factory can handle the predicate.
+#[derive(Clone, Debug)]
+pub struct SpatialJoinExtensionPlanner {
+    factories: Vec<Arc<dyn SpatialJoinPhysicalPlanner>>,
+}
+
+impl SpatialJoinExtensionPlanner {
+    /// Create a new planner with the given factories.
+    pub fn new(factories: Vec<Arc<dyn SpatialJoinPhysicalPlanner>>) -> Self {
+        Self { factories }
+    }
+
+    /// Append a new join factory
+    ///
+    /// Implementations are checked in reverse order such that more recently 
added
+    /// implementations can override the default join.
+    pub fn append_spatial_join_physical_planner(
+        &mut self,
+        factory: Arc<dyn SpatialJoinPhysicalPlanner>,
+    ) {
+        self.factories.push(factory);
+    }
+}
+
+#[async_trait]
+impl ExtensionPlanner for SpatialJoinExtensionPlanner {
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let Some(spatial_node) = 
node.as_any().downcast_ref::<SpatialJoinPlanNode>() else {
+            return Ok(None);
+        };
+
+        let Some(ext) = session_state
+            .config_options()
+            .extensions
+            .get::<SedonaOptions>()
+        else {
+            return sedona_internal_err!("SedonaOptions not found in session 
state extensions");
+        };
+
+        if !ext.spatial_join.enable {
+            return sedona_internal_err!("Spatial join is disabled in 
SedonaOptions");
+        }
+
+        if logical_inputs.len() != 2 || physical_inputs.len() != 2 {
+            return plan_err!("SpatialJoinPlanNode expects 2 inputs");
+        }
+
+        let join_type = &spatial_node.join_type;
+
+        let (physical_left, physical_right) =
+            (physical_inputs[0].clone(), physical_inputs[1].clone());
+
+        let join_filter = logical_join_filter_to_physical(
+            spatial_node,
+            session_state,
+            &physical_left,
+            &physical_right,
+        )?;
+
+        let Some((spatial_predicate, remainder)) = 
transform_join_filter(&join_filter) else {
+            let nlj = NestedLoopJoinExec::try_new(
+                physical_left,
+                physical_right,
+                Some(join_filter),
+                join_type,
+                None,
+            )?;
+            return Ok(Some(Arc::new(nlj)));
+        };
+
+        let args = PlanSpatialJoinArgs {
+            physical_left: &physical_left,
+            physical_right: &physical_right,
+            spatial_predicate: &spatial_predicate,
+            remainder: remainder.as_ref(),
+            join_type,
+            join_options: &ext.spatial_join,
+            options: session_state.config_options(),
+        };
+
+        // Iterate over in reverse to handle more recently added factories 
first
+        for factory in self.factories.iter().rev() {
+            if let Some(exec) = factory.plan_spatial_join(&args)? {
+                return Ok(Some(exec));
+            }
+        }
+
+        let nlj = NestedLoopJoinExec::try_new(
+            physical_left,
+            physical_right,
+            Some(join_filter),
+            join_type,
+            None,
+        )?;
+
+        Ok(Some(Arc::new(nlj)))
+    }
+}
+
+/// This function is mostly taken from the match arm for handling 
LogicalPlan::Join in
+/// 
https://github.com/apache/datafusion/blob/51.0.0/datafusion/core/src/physical_planner.rs#L1144-L1245
+fn logical_join_filter_to_physical(
+    plan_node: &SpatialJoinPlanNode,
+    session_state: &SessionState,
+    physical_left: &Arc<dyn ExecutionPlan>,
+    physical_right: &Arc<dyn ExecutionPlan>,
+) -> Result<JoinFilter> {
+    let SpatialJoinPlanNode {
+        left,
+        right,
+        filter,
+        ..
+    } = plan_node;
+
+    let left_df_schema = left.schema();
+    let right_df_schema = right.schema();
+
+    // Extract columns from filter expression and saved in a HashSet
+    let cols = filter.column_refs();
+
+    // Collect left & right field indices, the field indices are sorted in 
ascending order
+    let mut left_field_indices = cols
+        .iter()
+        .filter_map(|c| left_df_schema.index_of_column(c).ok())
+        .collect::<Vec<_>>();
+    left_field_indices.sort_unstable();
+
+    let mut right_field_indices = cols
+        .iter()
+        .filter_map(|c| right_df_schema.index_of_column(c).ok())
+        .collect::<Vec<_>>();
+    right_field_indices.sort_unstable();
+
+    // Collect DFFields and Fields required for intermediate schemas
+    let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = 
left_field_indices
+        .clone()
+        .into_iter()
+        .map(|i| {
+            (
+                left_df_schema.qualified_field(i),
+                physical_left.schema().field(i).clone(),
+            )
+        })
+        .chain(right_field_indices.clone().into_iter().map(|i| {
+            (
+                right_df_schema.qualified_field(i),
+                physical_right.schema().field(i).clone(),
+            )
+        }))
+        .unzip();
+    let filter_df_fields = filter_df_fields
+        .into_iter()
+        .map(|(qualifier, field)| (qualifier.cloned(), 
Arc::new(field.clone())))
+        .collect::<Vec<_>>();
+
+    let metadata: HashMap<_, _> = left_df_schema
+        .metadata()
+        .clone()
+        .into_iter()
+        .chain(right_df_schema.metadata().clone())
+        .collect();
+
+    // Construct intermediate schemas used for filtering data and
+    // convert logical expression to physical according to filter schema
+    let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields, 
metadata.clone())?;
+    let filter_schema = Schema::new_with_metadata(filter_fields, metadata);
+
+    let filter_expr =
+        create_physical_expr(filter, &filter_df_schema, 
session_state.execution_props())?;
+    let column_indices = JoinFilter::build_column_indices(left_field_indices, 
right_field_indices);
+
+    let join_filter = JoinFilter::new(filter_expr, column_indices, 
Arc::new(filter_schema));
+    Ok(join_filter)
+}
diff --git a/rust/sedona-spatial-join/src/spatial_predicate.rs 
b/rust/sedona-query-planner/src/spatial_predicate.rs
similarity index 100%
rename from rust/sedona-spatial-join/src/spatial_predicate.rs
rename to rust/sedona-query-planner/src/spatial_predicate.rs
diff --git a/rust/sedona-spatial-join/Cargo.toml 
b/rust/sedona-spatial-join/Cargo.toml
index e9d830cf..26b084fa 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -61,6 +61,7 @@ sedona-functions = { workspace = true }
 sedona-geo = { workspace = true }
 sedona-geometry = { workspace = true }
 sedona-schema = { workspace = true }
+sedona-query-planner = { workspace = true }
 sedona-tg = { workspace = true }
 sedona-geos = { workspace = true }
 wkb = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/index.rs 
b/rust/sedona-spatial-join/src/index.rs
index cd5f38d2..9c3c00c5 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -27,9 +27,8 @@ pub(crate) mod spatial_index_builder;
 pub(crate) use build_side_collector::{
     BuildPartition, BuildSideBatchesCollector, CollectBuildSideMetrics,
 };
-pub(crate) use spatial_index::SpatialIndex;
-
 pub(crate) use default_spatial_index_builder::DefaultSpatialIndexBuilder;
+
 use wkb::reader::Wkb;
 
 /// The result of a spatial index query
@@ -40,9 +39,13 @@ pub(crate) struct IndexQueryResult<'a, 'b> {
     pub position: (i32, i32),
 }
 
+// Public definitions for extensions that define their own indexing
+pub use spatial_index::SpatialIndex;
+pub use spatial_index_builder::SpatialIndexBuilder;
+
 /// The metrics for a spatial index query
 #[derive(Debug)]
-pub(crate) struct QueryResultMetrics {
+pub struct QueryResultMetrics {
     pub count: usize,
     pub candidate_count: usize,
 }
diff --git a/rust/sedona-spatial-join/src/index/default_spatial_index.rs 
b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
index e1c85e77..45028afc 100644
--- a/rust/sedona-spatial-join/src/index/default_spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
@@ -283,7 +283,6 @@ impl SpatialIndex for DefaultSpatialIndex {
         self.inner.schema.clone()
     }
 
-    #[cfg(test)]
     fn num_indexed_batches(&self) -> usize {
         self.inner.indexed_batches.len()
     }
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs 
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index ccad19bd..ecd72923 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -36,11 +36,10 @@ pub const DISTANCE_TOLERANCE: f64 = 1e-9;
 /// as well as methods for managing probe statistics and tracking visited 
build side batches.
 /// The trait is designed to be implemented by various spatial index structures
 #[async_trait]
-pub(crate) trait SpatialIndex {
+pub trait SpatialIndex {
     /// Returns the schema of the indexed data.
     fn schema(&self) -> SchemaRef;
     /// Returns the number of batches that have been indexed.
-    #[cfg(test)] // This is used for tests
     fn num_indexed_batches(&self) -> usize;
     /// Get the batch at the given index.
     fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch;
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs 
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index c51217ba..af21c01b 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -25,7 +25,7 @@ use datafusion_common::Result;
 
 /// Builder for constructing a SpatialIndex from geometry batches.
 #[async_trait]
-pub(crate) trait SpatialIndexBuilder: Send + Sync {
+pub trait SpatialIndexBuilder: Send + Sync {
     /// Add a stream to this builder
     async fn add_stream(
         &mut self,
@@ -39,11 +39,11 @@ pub(crate) trait SpatialIndexBuilder: Send + Sync {
 
 /// Metrics for the build phase of the spatial join.
 #[derive(Clone, Debug, Default)]
-pub(crate) struct SpatialJoinBuildMetrics {
+pub struct SpatialJoinBuildMetrics {
     /// Total time for collecting build-side of join
-    pub(crate) build_time: metrics::Time,
+    pub build_time: metrics::Time,
     /// Memory used by the spatial-index in bytes
-    pub(crate) build_mem_used: metrics::Gauge,
+    pub build_mem_used: metrics::Gauge,
 }
 
 impl SpatialJoinBuildMetrics {
diff --git a/rust/sedona-spatial-join/src/join_provider.rs 
b/rust/sedona-spatial-join/src/join_provider.rs
index 595a6d12..4bfc5b13 100644
--- a/rust/sedona-spatial-join/src/join_provider.rs
+++ b/rust/sedona-spatial-join/src/join_provider.rs
@@ -38,7 +38,7 @@ use crate::{
 /// details of a spatial join. In particular it allows plugging in a custom
 /// index for accelerated joins on specific hardware (e.g., GPU) and a custom
 /// bounder for specific data types (e.g., geography).
-pub(crate) trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
+pub trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
     /// Create a new [SpatialIndexBuilder]
     fn try_new_spatial_index_builder(
         &self,
diff --git a/rust/sedona-spatial-join/src/lib.rs 
b/rust/sedona-spatial-join/src/lib.rs
index 372efb7f..3301fc09 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -17,26 +17,26 @@
 
 pub mod evaluated_batch;
 pub mod exec;
-mod index;
-mod join_provider;
+pub mod index;
+pub mod join_provider;
 pub mod operand_evaluator;
 pub mod partitioning;
-pub mod planner;
+pub mod physical_planner;
 mod prepare;
 mod probe;
 pub mod refine;
-pub mod spatial_predicate;
+pub use sedona_query_planner::spatial_predicate;
 mod stream;
 pub mod utils;
 
 pub use exec::SpatialJoinExec;
 
 // Re-export function for register the spatial join planner
-pub use planner::register_planner;
+pub use physical_planner::DefaultSpatialJoinPhysicalPlanner;
 
 // Re-export ProbeShuffleExec so that integration tests (and other crates) can 
verify
 // its presence in optimized physical plans.
-pub use planner::probe_shuffle_exec::ProbeShuffleExec;
+pub use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
 
 // Re-export types needed for external usage (e.g., in Comet)
 pub use spatial_predicate::SpatialPredicate;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs 
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index 3c316b4c..859c3465 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -45,7 +45,7 @@ use crate::{
 /// support non-Cartesian bounding or non-WKB backed geometry arrays. This 
also may
 /// expand to support more compact or efficient serialization/deserialization 
of the
 /// evaluated array when spilling.
-pub(crate) trait EvaluatedGeometryArrayFactory: fmt::Debug + Send + Sync {
+pub trait EvaluatedGeometryArrayFactory: fmt::Debug + Send + Sync {
     /// Create a new [EvaluatedGeometryArray]
     fn try_new_evaluated_array(
         &self,
diff --git a/rust/sedona-spatial-join/src/physical_planner.rs 
b/rust/sedona-spatial-join/src/physical_planner.rs
new file mode 100644
index 00000000..37e7ef34
--- /dev/null
+++ b/rust/sedona-spatial-join/src/physical_planner.rs
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{JoinSide, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec;
+use sedona_query_planner::spatial_join_physical_planner::{
+    PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{DistancePredicate, KNNPredicate, 
RelationPredicate};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+
+use crate::exec::SpatialJoinExec;
+use crate::spatial_predicate::SpatialPredicate;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct DefaultSpatialJoinPhysicalPlanner;
+
+impl DefaultSpatialJoinPhysicalPlanner {
+    /// Create a new default join factory
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for DefaultSpatialJoinPhysicalPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SpatialJoinPhysicalPlanner for DefaultSpatialJoinPhysicalPlanner {
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if !is_spatial_predicate_supported(
+            args.spatial_predicate,
+            &args.physical_left.schema(),
+            &args.physical_right.schema(),
+        )? {
+            return Ok(None);
+        }
+
+        let should_swap = !matches!(
+            args.spatial_predicate,
+            SpatialPredicate::KNearestNeighbors(_)
+        ) && args.join_type.supports_swap()
+            && should_swap_join_order(
+                args.join_options,
+                args.physical_left.as_ref(),
+                args.physical_right.as_ref(),
+            )?;
+
+        // Repartition the probe side when enabled. This breaks spatial 
locality in sorted/skewed
+        // datasets, leading to more balanced workloads during out-of-core 
spatial join.
+        // We determine which pre-swap input will be the probe AFTER any 
potential swap, and
+        // repartition it here. swap_inputs() will then carry the 
RepartitionExec to the correct
+        // child position.
+        let (physical_left, physical_right) = if 
args.join_options.repartition_probe_side {
+            repartition_probe_side(
+                args.physical_left.clone(),
+                args.physical_right.clone(),
+                args.spatial_predicate,
+                should_swap,
+            )?
+        } else {
+            (args.physical_left.clone(), args.physical_right.clone())
+        };
+
+        let exec = SpatialJoinExec::try_new(
+            physical_left,
+            physical_right,
+            args.spatial_predicate.clone(),
+            args.remainder.cloned(),
+            args.join_type,
+            None,
+            args.join_options,
+        )?;
+
+        if should_swap {
+            exec.swap_inputs().map(Some)
+        } else {
+            Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
+        }
+    }
+}
+
+/// Spatial join reordering heuristic:
+/// 1. Put the input with fewer rows on the build side, because fewer entries
+///    produce a smaller and more efficient spatial index (R-tree).
+/// 2. If row-count statistics are unavailable (for example, for CSV sources),
+///    fall back to total input size as an estimate.
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+///    statistics are available.
+fn should_swap_join_order(
+    spatial_join_options: &SpatialJoinOptions,
+    left: &dyn ExecutionPlan,
+    right: &dyn ExecutionPlan,
+) -> Result<bool> {
+    if !spatial_join_options.spatial_join_reordering {
+        log::info!(
+            "spatial join swap heuristic disabled via 
sedona.spatial_join.spatial_join_reordering"
+        );
+        return Ok(false);
+    }
+
+    let left_stats = left.partition_statistics(None)?;
+    let right_stats = right.partition_statistics(None)?;
+
+    let left_num_rows = left_stats.num_rows;
+    let right_num_rows = right_stats.num_rows;
+    let left_total_byte_size = left_stats.total_byte_size;
+    let right_total_byte_size = right_stats.total_byte_size;
+
+    let should_swap = match (left_num_rows.get_value(), 
right_num_rows.get_value()) {
+        (Some(l), Some(r)) => l > r,
+        _ => match (
+            left_total_byte_size.get_value(),
+            right_total_byte_size.get_value(),
+        ) {
+            (Some(l), Some(r)) => l > r,
+            _ => false,
+        },
+    };
+
+    log::info!(
+        "spatial join swap heuristic: left_num_rows={left_num_rows:?}, 
right_num_rows={right_num_rows:?}, 
left_total_byte_size={left_total_byte_size:?}, 
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
+    );
+
+    Ok(should_swap)
+}
+
+/// Repartition the probe side of a spatial join using `RoundRobinBatch` 
partitioning.
+///
+/// The purpose is to break spatial locality in sorted or skewed datasets, 
which can cause
+/// imbalanced partitions when running out-of-core spatial join. The number of 
partitions is
+/// preserved; only the distribution of rows across partitions is shuffled.
+///
+/// The `should_swap` parameter indicates whether `swap_inputs()` will be 
called after
+/// `SpatialJoinExec` is constructed. This affects which pre-swap input will 
become the
+/// probe side:
+/// - For non-KNN predicates: probe is always `Right` after any swap. If 
`should_swap` is true,
+///   the current `left` will become `right` (probe) after swap, so we 
repartition `left`.
+/// - For KNN predicates: `should_swap` is always false, and the probe side is 
determined by
+///   `KNNPredicate::probe_side`.
+fn repartition_probe_side(
+    mut physical_left: Arc<dyn ExecutionPlan>,
+    mut physical_right: Arc<dyn ExecutionPlan>,
+    spatial_predicate: &SpatialPredicate,
+    should_swap: bool,
+) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
+    let probe_plan = match spatial_predicate {
+        SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
+            JoinSide::Left => &mut physical_left,
+            JoinSide::Right => &mut physical_right,
+            JoinSide::None => {
+                // KNNPredicate::probe_side is asserted not to be None in its 
constructor;
+                // treat this as a debug-only invariant violation and default 
to right.
+                debug_assert!(false, "KNNPredicate::probe_side must not be 
JoinSide::None");
+                &mut physical_right
+            }
+        },
+        _ => {
+            // For Relation/Distance predicates, probe is always Right after 
swap.
+            // If should_swap, the current left will be moved to the right 
(probe) by swap_inputs().
+            if should_swap {
+                &mut physical_left
+            } else {
+                &mut physical_right
+            }
+        }
+    };
+
+    *probe_plan = Arc::new(ProbeShuffleExec::try_new(Arc::clone(probe_plan))?);
+
+    Ok((physical_left, physical_right))
+}
+
+pub fn is_spatial_predicate_supported(
+    spatial_predicate: &SpatialPredicate,
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Result<bool> {
+    /// Only spatial predicates working with planar geometry are supported for 
optimization.
+    /// Geography (spherical) types are explicitly excluded and will not 
trigger optimized spatial joins.
+    fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema: 
&Schema) -> Result<bool> {
+        let left_return_field = expr.return_field(schema)?;
+        let sedona_type = SedonaType::from_storage_field(&left_return_field)?;
+        let matcher = ArgMatcher::is_geometry();
+        Ok(matcher.match_type(&sedona_type))
+    }
+
+    match spatial_predicate {
+        SpatialPredicate::Relation(RelationPredicate { left, right, .. })
+        | SpatialPredicate::Distance(DistancePredicate { left, right, .. }) => 
{
+            Ok(is_geometry_type_supported(left, left_schema)?
+                && is_geometry_type_supported(right, right_schema)?)
+        }
+        SpatialPredicate::KNearestNeighbors(KNNPredicate {
+            left,
+            right,
+            probe_side,
+            ..
+        }) => {
+            let (left, right) = match probe_side {
+                JoinSide::Left => (left, right),
+                JoinSide::Right => (right, left),
+                _ => {
+                    return sedona_internal_err!(
+                        "Invalid probe side in KNN predicate: {:?}",
+                        probe_side
+                    )
+                }
+            };
+            Ok(is_geometry_type_supported(left, left_schema)?
+                && is_geometry_type_supported(right, right_schema)?)
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use arrow_schema::{DataType, Field};
+    use datafusion_physical_expr::expressions::Column;
+    use sedona_query_planner::spatial_predicate::SpatialRelationType;
+    use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
+
+    use super::*;
+
+    #[test]
+    fn test_is_spatial_predicate_supported() {
+        // Planar geometry field
+        let geom_field = WKB_GEOMETRY.to_storage_field("geom", false).unwrap();
+        let schema = Arc::new(Schema::new(vec![geom_field.clone()]));
+        let col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn 
PhysicalExpr>;
+        let rel_pred = RelationPredicate::new(
+            col_expr.clone(),
+            col_expr.clone(),
+            SpatialRelationType::Intersects,
+        );
+        let spatial_pred = SpatialPredicate::Relation(rel_pred);
+        assert!(is_spatial_predicate_supported(&spatial_pred, &schema, 
&schema).unwrap());
+
+        // Geography field (should NOT be supported)
+        let geog_field = WKB_GEOGRAPHY.to_storage_field("geog", 
false).unwrap();
+        let geog_schema = Arc::new(Schema::new(vec![geog_field.clone()]));
+        let geog_col_expr = Arc::new(Column::new("geog", 0)) as Arc<dyn 
PhysicalExpr>;
+        let rel_pred_geog = RelationPredicate::new(
+            geog_col_expr.clone(),
+            geog_col_expr.clone(),
+            SpatialRelationType::Intersects,
+        );
+        let spatial_pred_geog = SpatialPredicate::Relation(rel_pred_geog);
+        assert!(
+            !is_spatial_predicate_supported(&spatial_pred_geog, &geog_schema, 
&geog_schema)
+                .unwrap()
+        );
+    }
+
+    #[test]
+    fn test_is_knn_predicate_supported() {
+        // ST_KNN(left, right)
+        let left_schema = Arc::new(Schema::new(vec![WKB_GEOMETRY
+            .to_storage_field("geom", false)
+            .unwrap()]));
+        let right_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            WKB_GEOMETRY.to_storage_field("geom", false).unwrap(),
+        ]));
+        let left_col_expr = Arc::new(Column::new("geom", 0)) as Arc<dyn 
PhysicalExpr>;
+        let right_col_expr = Arc::new(Column::new("geom", 1)) as Arc<dyn 
PhysicalExpr>;
+        let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
+            left_col_expr.clone(),
+            right_col_expr.clone(),
+            5,
+            false,
+            JoinSide::Left,
+        ));
+        assert!(is_spatial_predicate_supported(&knn_pred, &left_schema, 
&right_schema).unwrap());
+
+        // ST_KNN(right, left)
+        let knn_pred = SpatialPredicate::KNearestNeighbors(KNNPredicate::new(
+            right_col_expr.clone(),
+            left_col_expr.clone(),
+            5,
+            false,
+            JoinSide::Right,
+        ));
+        assert!(is_spatial_predicate_supported(&knn_pred, &left_schema, 
&right_schema).unwrap());
+
+        // ST_KNN with geography (should NOT be supported)
+        let left_geog_schema = Arc::new(Schema::new(vec![WKB_GEOGRAPHY
+            .to_storage_field("geog", false)
+            .unwrap()]));
+        assert!(
+            !is_spatial_predicate_supported(&knn_pred, &left_geog_schema, 
&right_schema).unwrap()
+        );
+
+        let right_geog_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            WKB_GEOGRAPHY.to_storage_field("geog", false).unwrap(),
+        ]));
+        assert!(
+            !is_spatial_predicate_supported(&knn_pred, &left_schema, 
&right_geog_schema).unwrap()
+        );
+    }
+}
diff --git a/rust/sedona-spatial-join/src/planner.rs 
b/rust/sedona-spatial-join/src/planner.rs
deleted file mode 100644
index b663b293..00000000
--- a/rust/sedona-spatial-join/src/planner.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! DataFusion planner integration for Sedona spatial joins.
-//!
-//! This module wires Sedona's logical optimizer rules and physical planning 
extensions that
-//! can produce `SpatialJoinExec`.
-
-use datafusion::execution::SessionStateBuilder;
-use datafusion_common::Result;
-
-mod logical_plan_node;
-mod optimizer;
-mod physical_planner;
-pub mod probe_shuffle_exec;
-mod spatial_expr_utils;
-
-/// Register Sedona spatial join planning hooks.
-///
-/// Enables logical rewrites (to surface join filters) and a query planner 
extension that can
-/// plan `SpatialJoinExec`. This is the primary entry point to leveraging the 
spatial join
-/// implementation provided by this crate and ensures joins created by SQL or 
using
-/// a DataFrame API that meet certain conditions (e.g. contain a spatial 
predicate as
-/// a join condition) are executed using the `SpatialJoinExec`.
-pub fn register_planner(state_builder: SessionStateBuilder) -> 
Result<SessionStateBuilder> {
-    // Enable the logical rewrite that turns Filter(CrossJoin) into 
Join(filter=...)
-    let state_builder = 
optimizer::register_spatial_join_logical_optimizer(state_builder)?;
-
-    // Enable planning SpatialJoinExec via an extension node during 
logical->physical planning.
-    Ok(physical_planner::register_spatial_join_planner(
-        state_builder,
-    ))
-}
diff --git a/rust/sedona-spatial-join/src/planner/physical_planner.rs 
b/rust/sedona-spatial-join/src/planner/physical_planner.rs
deleted file mode 100644
index 0eb7d0d8..00000000
--- a/rust/sedona-spatial-join/src/planner/physical_planner.rs
+++ /dev/null
@@ -1,360 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-
-use arrow_schema::Schema;
-
-use datafusion::execution::context::QueryPlanner;
-use datafusion::execution::session_state::{SessionState, SessionStateBuilder};
-use datafusion::physical_plan::ExecutionPlan;
-use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner};
-use datafusion_common::{plan_err, DFSchema, JoinSide, Result};
-use datafusion_expr::logical_plan::UserDefinedLogicalNode;
-use datafusion_expr::LogicalPlan;
-use datafusion_physical_expr::create_physical_expr;
-use datafusion_physical_plan::joins::utils::JoinFilter;
-use datafusion_physical_plan::joins::NestedLoopJoinExec;
-use sedona_common::sedona_internal_err;
-
-use crate::exec::SpatialJoinExec;
-use crate::planner::logical_plan_node::SpatialJoinPlanNode;
-use crate::planner::probe_shuffle_exec::ProbeShuffleExec;
-use crate::planner::spatial_expr_utils::{is_spatial_predicate_supported, 
transform_join_filter};
-use crate::spatial_predicate::SpatialPredicate;
-use sedona_common::option::{SedonaOptions, SpatialJoinOptions};
-
-/// Registers a query planner that can produce [`SpatialJoinExec`] from a 
logical extension node.
-pub(crate) fn register_spatial_join_planner(builder: SessionStateBuilder) -> 
SessionStateBuilder {
-    let extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
-        vec![Arc::new(SpatialJoinExtensionPlanner {})];
-    builder.with_query_planner(Arc::new(SedonaSpatialQueryPlanner { 
extension_planners }))
-}
-
-/// Query planner that enables Sedona's spatial join planning.
-///
-/// Installs an [`ExtensionPlanner`] that recognizes `SpatialJoinPlanNode` and 
produces
-/// `SpatialJoinExec` when supported and enabled.
-struct SedonaSpatialQueryPlanner {
-    extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
-}
-
-impl fmt::Debug for SedonaSpatialQueryPlanner {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("SedonaSpatialQueryPlanner").finish()
-    }
-}
-
-#[async_trait]
-impl QueryPlanner for SedonaSpatialQueryPlanner {
-    async fn create_physical_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        session_state: &SessionState,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let physical_planner =
-            
DefaultPhysicalPlanner::with_extension_planners(self.extension_planners.clone());
-        physical_planner
-            .create_physical_plan(logical_plan, session_state)
-            .await
-    }
-}
-
-/// Physical planner hook for `SpatialJoinPlanNode`.
-struct SpatialJoinExtensionPlanner;
-
-#[async_trait]
-impl ExtensionPlanner for SpatialJoinExtensionPlanner {
-    async fn plan_extension(
-        &self,
-        _planner: &dyn PhysicalPlanner,
-        node: &dyn UserDefinedLogicalNode,
-        logical_inputs: &[&LogicalPlan],
-        physical_inputs: &[Arc<dyn ExecutionPlan>],
-        session_state: &SessionState,
-    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        let Some(spatial_node) = 
node.as_any().downcast_ref::<SpatialJoinPlanNode>() else {
-            return Ok(None);
-        };
-
-        let Some(ext) = session_state
-            .config_options()
-            .extensions
-            .get::<SedonaOptions>()
-        else {
-            return sedona_internal_err!("SedonaOptions not found in session 
state extensions");
-        };
-        let spatial_join_options = &ext.spatial_join;
-
-        if !ext.spatial_join.enable {
-            return sedona_internal_err!("Spatial join is disabled in 
SedonaOptions");
-        }
-
-        if logical_inputs.len() != 2 || physical_inputs.len() != 2 {
-            return plan_err!("SpatialJoinPlanNode expects 2 inputs");
-        }
-
-        let join_type = &spatial_node.join_type;
-
-        let (physical_left, physical_right) =
-            (physical_inputs[0].clone(), physical_inputs[1].clone());
-
-        let join_filter = logical_join_filter_to_physical(
-            spatial_node,
-            session_state,
-            &physical_left,
-            &physical_right,
-        )?;
-
-        let Some((spatial_predicate, remainder)) = 
transform_join_filter(&join_filter) else {
-            let nlj = NestedLoopJoinExec::try_new(
-                physical_left,
-                physical_right,
-                Some(join_filter),
-                join_type,
-                None,
-            )?;
-            return Ok(Some(Arc::new(nlj)));
-        };
-
-        if !is_spatial_predicate_supported(
-            &spatial_predicate,
-            &physical_left.schema(),
-            &physical_right.schema(),
-        )? {
-            let nlj = NestedLoopJoinExec::try_new(
-                physical_left,
-                physical_right,
-                Some(join_filter),
-                join_type,
-                None,
-            )?;
-            return Ok(Some(Arc::new(nlj)));
-        }
-
-        let should_swap = !matches!(spatial_predicate, 
SpatialPredicate::KNearestNeighbors(_))
-            && join_type.supports_swap()
-            && should_swap_join_order(
-                spatial_join_options,
-                physical_left.as_ref(),
-                physical_right.as_ref(),
-            )?;
-
-        // Repartition the probe side when enabled. This breaks spatial 
locality in sorted/skewed
-        // datasets, leading to more balanced workloads during out-of-core 
spatial join.
-        // We determine which pre-swap input will be the probe AFTER any 
potential swap, and
-        // repartition it here. swap_inputs() will then carry the 
RepartitionExec to the correct
-        // child position.
-        let (physical_left, physical_right) = if 
spatial_join_options.repartition_probe_side {
-            repartition_probe_side(
-                physical_left,
-                physical_right,
-                &spatial_predicate,
-                should_swap,
-            )?
-        } else {
-            (physical_left, physical_right)
-        };
-
-        let exec = SpatialJoinExec::try_new(
-            physical_left,
-            physical_right,
-            spatial_predicate,
-            remainder,
-            join_type,
-            None,
-            spatial_join_options,
-        )?;
-
-        if should_swap {
-            exec.swap_inputs().map(Some)
-        } else {
-            Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
-        }
-    }
-}
-
-/// Spatial join reordering heuristic:
-/// 1. Put the input with fewer rows on the build side, because fewer entries
-///    produce a smaller and more efficient spatial index (R-tree).
-/// 2. If row-count statistics are unavailable (for example, for CSV sources),
-///    fall back to total input size as an estimate.
-/// 3. Do not swap the join order if join reordering is disabled or no relevant
-///    statistics are available.
-fn should_swap_join_order(
-    spatial_join_options: &SpatialJoinOptions,
-    left: &dyn ExecutionPlan,
-    right: &dyn ExecutionPlan,
-) -> Result<bool> {
-    if !spatial_join_options.spatial_join_reordering {
-        log::info!(
-            "spatial join swap heuristic disabled via 
sedona.spatial_join.spatial_join_reordering"
-        );
-        return Ok(false);
-    }
-
-    let left_stats = left.partition_statistics(None)?;
-    let right_stats = right.partition_statistics(None)?;
-
-    let left_num_rows = left_stats.num_rows;
-    let right_num_rows = right_stats.num_rows;
-    let left_total_byte_size = left_stats.total_byte_size;
-    let right_total_byte_size = right_stats.total_byte_size;
-
-    let should_swap = match (left_num_rows.get_value(), 
right_num_rows.get_value()) {
-        (Some(l), Some(r)) => l > r,
-        _ => match (
-            left_total_byte_size.get_value(),
-            right_total_byte_size.get_value(),
-        ) {
-            (Some(l), Some(r)) => l > r,
-            _ => false,
-        },
-    };
-
-    log::info!(
-        "spatial join swap heuristic: left_num_rows={left_num_rows:?}, 
right_num_rows={right_num_rows:?}, 
left_total_byte_size={left_total_byte_size:?}, 
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
-    );
-
-    Ok(should_swap)
-}
-
-/// This function is mostly taken from the match arm for handling 
LogicalPlan::Join in
-/// 
https://github.com/apache/datafusion/blob/51.0.0/datafusion/core/src/physical_planner.rs#L1144-L1245
-fn logical_join_filter_to_physical(
-    plan_node: &SpatialJoinPlanNode,
-    session_state: &SessionState,
-    physical_left: &Arc<dyn ExecutionPlan>,
-    physical_right: &Arc<dyn ExecutionPlan>,
-) -> Result<JoinFilter> {
-    let SpatialJoinPlanNode {
-        left,
-        right,
-        filter,
-        ..
-    } = plan_node;
-
-    let left_df_schema = left.schema();
-    let right_df_schema = right.schema();
-
-    // Extract columns from filter expression and saved in a HashSet
-    let cols = filter.column_refs();
-
-    // Collect left & right field indices, the field indices are sorted in 
ascending order
-    let mut left_field_indices = cols
-        .iter()
-        .filter_map(|c| left_df_schema.index_of_column(c).ok())
-        .collect::<Vec<_>>();
-    left_field_indices.sort_unstable();
-
-    let mut right_field_indices = cols
-        .iter()
-        .filter_map(|c| right_df_schema.index_of_column(c).ok())
-        .collect::<Vec<_>>();
-    right_field_indices.sort_unstable();
-
-    // Collect DFFields and Fields required for intermediate schemas
-    let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = 
left_field_indices
-        .clone()
-        .into_iter()
-        .map(|i| {
-            (
-                left_df_schema.qualified_field(i),
-                physical_left.schema().field(i).clone(),
-            )
-        })
-        .chain(right_field_indices.clone().into_iter().map(|i| {
-            (
-                right_df_schema.qualified_field(i),
-                physical_right.schema().field(i).clone(),
-            )
-        }))
-        .unzip();
-    let filter_df_fields = filter_df_fields
-        .into_iter()
-        .map(|(qualifier, field)| (qualifier.cloned(), 
Arc::new(field.clone())))
-        .collect::<Vec<_>>();
-
-    let metadata: HashMap<_, _> = left_df_schema
-        .metadata()
-        .clone()
-        .into_iter()
-        .chain(right_df_schema.metadata().clone())
-        .collect();
-
-    // Construct intermediate schemas used for filtering data and
-    // convert logical expression to physical according to filter schema
-    let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields, 
metadata.clone())?;
-    let filter_schema = Schema::new_with_metadata(filter_fields, metadata);
-
-    let filter_expr =
-        create_physical_expr(filter, &filter_df_schema, 
session_state.execution_props())?;
-    let column_indices = JoinFilter::build_column_indices(left_field_indices, 
right_field_indices);
-
-    let join_filter = JoinFilter::new(filter_expr, column_indices, 
Arc::new(filter_schema));
-    Ok(join_filter)
-}
-
-/// Repartition the probe side of a spatial join using `RoundRobinBatch` 
partitioning.
-///
-/// The purpose is to break spatial locality in sorted or skewed datasets, 
which can cause
-/// imbalanced partitions when running out-of-core spatial join. The number of 
partitions is
-/// preserved; only the distribution of rows across partitions is shuffled.
-///
-/// The `should_swap` parameter indicates whether `swap_inputs()` will be 
called after
-/// `SpatialJoinExec` is constructed. This affects which pre-swap input will 
become the
-/// probe side:
-/// - For non-KNN predicates: probe is always `Right` after any swap. If 
`should_swap` is true,
-///   the current `left` will become `right` (probe) after swap, so we 
repartition `left`.
-/// - For KNN predicates: `should_swap` is always false, and the probe side is 
determined by
-///   `KNNPredicate::probe_side`.
-fn repartition_probe_side(
-    mut physical_left: Arc<dyn ExecutionPlan>,
-    mut physical_right: Arc<dyn ExecutionPlan>,
-    spatial_predicate: &SpatialPredicate,
-    should_swap: bool,
-) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
-    let probe_plan = match spatial_predicate {
-        SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
-            JoinSide::Left => &mut physical_left,
-            JoinSide::Right => &mut physical_right,
-            JoinSide::None => {
-                // KNNPredicate::probe_side is asserted not to be None in its 
constructor;
-                // treat this as a debug-only invariant violation and default 
to right.
-                debug_assert!(false, "KNNPredicate::probe_side must not be 
JoinSide::None");
-                &mut physical_right
-            }
-        },
-        _ => {
-            // For Relation/Distance predicates, probe is always Right after 
swap.
-            // If should_swap, the current left will be moved to the right 
(probe) by swap_inputs().
-            if should_swap {
-                &mut physical_left
-            } else {
-                &mut physical_right
-            }
-        }
-    };
-
-    *probe_plan = Arc::new(ProbeShuffleExec::try_new(Arc::clone(probe_plan))?);
-
-    Ok((physical_left, physical_right))
-}
diff --git a/rust/sedona-spatial-join/tests/spatial_join_integration.rs 
b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
index 60f0033b..8cfda7f3 100644
--- a/rust/sedona-spatial-join/tests/spatial_join_integration.rs
+++ b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
@@ -43,13 +43,16 @@ use sedona_common::SedonaOptions;
 use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
 use sedona_geo::to_geo::GeoTypesExecutor;
 use sedona_geometry::types::GeometryTypeId;
+use sedona_query_planner::{
+    optimizer::register_spatial_join_logical_optimizer, 
query_planner::SedonaQueryPlanner,
+};
 use sedona_schema::{
     datatypes::{SedonaType, WKB_GEOGRAPHY, WKB_GEOMETRY},
     matchers::ArgMatcher,
 };
 use sedona_spatial_join::{
-    register_planner, spatial_predicate::RelationPredicate, ProbeShuffleExec, 
SpatialJoinExec,
-    SpatialPredicate,
+    spatial_predicate::RelationPredicate, DefaultSpatialJoinPhysicalPlanner, 
ProbeShuffleExec,
+    SpatialJoinExec, SpatialPredicate,
 };
 use sedona_testing::datagen::RandomPartitionedDataBuilder;
 use tokio::sync::OnceCell;
@@ -154,7 +157,12 @@ fn setup_context(options: Option<SpatialJoinOptions>, 
batch_size: usize) -> Resu
     session_config = add_sedona_option_extension(session_config);
     let mut state_builder = SessionStateBuilder::new();
     if let Some(options) = options {
-        state_builder = register_planner(state_builder)?;
+        state_builder = 
register_spatial_join_logical_optimizer(state_builder)?;
+        state_builder = state_builder.with_query_planner(Arc::new(
+            
SedonaQueryPlanner::new().with_spatial_join_physical_planner(Arc::new(
+                DefaultSpatialJoinPhysicalPlanner::new(),
+            )),
+        ));
         let opts = session_config
             .options_mut()
             .extensions
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index 051c7841..b0787420 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -82,6 +82,7 @@ sedona-gdal = { workspace = true }
 sedona-raster-functions = { workspace = true }
 sedona-schema = { workspace = true }
 sedona-spatial-join = { workspace = true, optional = true }
+sedona-query-planner = { workspace = true }
 sedona-s2geography = { workspace = true, optional = true }
 sedona-testing = { workspace = true }
 sedona-tg = { workspace = true, optional = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index ecf5362b..b85e3be8 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -64,6 +64,9 @@ use sedona_pointcloud::las::{
     format::{Extension, LasFormatFactory},
     options::{GeometryEncoding, LasExtraBytes, LasOptions},
 };
+use sedona_query_planner::{
+    optimizer::register_spatial_join_logical_optimizer, 
query_planner::SedonaQueryPlanner,
+};
 
 /// Sedona SessionContext wrapper
 ///
@@ -143,11 +146,19 @@ impl SedonaContext {
             .with_config(session_config);
 
         // Register the spatial join planner extension
+        let mut planner = SedonaQueryPlanner::new();
         #[cfg(feature = "spatial-join")]
         {
-            state_builder = 
sedona_spatial_join::register_planner(state_builder)?;
+            use 
sedona_spatial_join::physical_planner::DefaultSpatialJoinPhysicalPlanner;
+
+            planner = planner.with_spatial_join_physical_planner(Arc::new(
+                DefaultSpatialJoinPhysicalPlanner::new(),
+            ));
         }
 
+        state_builder = 
register_spatial_join_logical_optimizer(state_builder)?;
+        state_builder = state_builder.with_query_planner(Arc::new(planner));
+
         let mut state = state_builder.build();
         state.register_file_format(Arc::new(GeoParquetFormatFactory::new()), 
true)?;
         #[cfg(feature = "pointcloud")]

Reply via email to