paleolimbot commented on code in PR #735: URL: https://github.com/apache/sedona-db/pull/735#discussion_r2984299388
########## rust/sedona-query-planner/src/spatial_join_factory.rs: ########## @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extension planner for spatial join plan nodes. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; + +use arrow_schema::Schema; + +use datafusion::config::ConfigOptions; +use datafusion::execution::session_state::SessionState; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; +use datafusion_common::{plan_err, DFSchema, Result}; +use datafusion_expr::logical_plan::UserDefinedLogicalNode; +use datafusion_expr::{JoinType, LogicalPlan}; +use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_plan::joins::utils::JoinFilter; +use datafusion_physical_plan::joins::NestedLoopJoinExec; +use sedona_common::option::SedonaOptions; +use sedona_common::{sedona_internal_err, SpatialJoinOptions}; + +use crate::logical_plan_node::SpatialJoinPlanNode; +use crate::spatial_expr_utils::transform_join_filter; +use crate::spatial_predicate::SpatialPredicate; + +/// Arguments passed to a [`SpatialJoinFactory`] when planning a spatial join. +pub struct PlanSpatialJoinArgs<'a> { + pub physical_left: &'a Arc<dyn ExecutionPlan>, + pub physical_right: &'a Arc<dyn ExecutionPlan>, + pub spatial_predicate: &'a SpatialPredicate, + pub remainder: Option<&'a JoinFilter>, + pub join_type: &'a JoinType, + pub join_options: &'a SpatialJoinOptions, + pub options: &'a Arc<ConfigOptions>, +} + +/// Factory trait for creating spatial join physical plans. +/// +/// Implementations decide whether they can handle a given spatial predicate and, +/// if so, produce an appropriate [`ExecutionPlan`]. +pub trait SpatialJoinFactory: std::fmt::Debug + Send + Sync { + /// Given the provided arguments, produce an [ExecutionPlan] implementing + /// the join operation or `None` if this implementation cannot execute the join + fn plan_spatial_join( + &self, + args: &PlanSpatialJoinArgs<'_>, + ) -> Result<Option<Arc<dyn ExecutionPlan>>>; +} Review Comment: This is the new piece of code that allows the query planner to live here (with the default join injected in the sedona crate when we construct the session state). ########## rust/sedona-spatial-join/src/factory.rs: ########## @@ -0,0 +1,314 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_schema::Schema; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::{JoinSide, Result}; +use datafusion_physical_expr::PhysicalExpr; +use sedona_common::sedona_internal_err; +use sedona_query_planner::probe_shuffle_exec::ProbeShuffleExec; +use sedona_query_planner::spatial_join_factory::{PlanSpatialJoinArgs, SpatialJoinFactory}; +use sedona_query_planner::spatial_predicate::{DistancePredicate, KNNPredicate, RelationPredicate}; +use sedona_schema::datatypes::SedonaType; +use sedona_schema::matchers::ArgMatcher; + +use crate::exec::SpatialJoinExec; +use crate::spatial_predicate::SpatialPredicate; + +/// [SpatialJoinFactory] implementation for the default spatial join +/// +/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able +/// to instantiate the [ExecutionPlan] implemented in this crate. +#[derive(Debug)] +pub struct DefaultSpatialJoinFactory; + +impl DefaultSpatialJoinFactory { + /// Create a new default join factory + pub fn new() -> Self { + Self {} + } +} + +impl Default for DefaultSpatialJoinFactory { + fn default() -> Self { + Self::new() + } +} + +impl SpatialJoinFactory for DefaultSpatialJoinFactory { + fn plan_spatial_join( + &self, + args: &PlanSpatialJoinArgs<'_>, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + if !is_spatial_predicate_supported( + args.spatial_predicate, + &args.physical_left.schema(), + &args.physical_right.schema(), + )? { + return Ok(None); + } + + let should_swap = !matches!( + args.spatial_predicate, + SpatialPredicate::KNearestNeighbors(_) + ) && args.join_type.supports_swap() + && should_swap_join_order(args.physical_left.as_ref(), args.physical_right.as_ref())?; Review Comment: This is the implementation of the factory for the default join (the only modification here is using `args`...previously this was `plan_extension()`). ########## rust/sedona/src/context.rs: ########## @@ -143,11 +146,17 @@ impl SedonaContext { .with_config(session_config); // Register the spatial join planner extension + let mut planner = SedonaQueryPlanner::new(); #[cfg(feature = "spatial-join")] { - state_builder = sedona_spatial_join::register_planner(state_builder)?; + use sedona_spatial_join::factory::DefaultSpatialJoinFactory; + + planner = planner.with_spatial_join_factory(Arc::new(DefaultSpatialJoinFactory::new())); } + state_builder = register_spatial_join_logical_optimizer(state_builder)?; + state_builder = state_builder.with_query_planner(Arc::new(planner)); Review Comment: This is how it is used when creating the session -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
