milenkovicm commented on code in PR #1647:
URL:
https://github.com/apache/datafusion-ballista/pull/1647#discussion_r3178515741
##########
ballista/scheduler/src/planner.rs:
##########
@@ -216,6 +290,143 @@ impl DefaultDistributedPlanner {
self.next_stage_id += 1;
self.next_stage_id
}
+
+ /// If `plan` is a `HashJoinExec(Partitioned)` whose smaller side fits
+ /// under the broadcast threshold, returns a rewritten
+ /// `HashJoinExec(CollectLeft)` (with a swap if the small side was on
+ /// the right) wrapped so the build subtree is a single-partition input.
+ /// Otherwise returns the input unchanged.
+ fn maybe_promote_to_broadcast(
Review Comment:
this should be captured with current datafusion rules, should it? Also
current planners will do join side swap
##########
ballista/scheduler/src/planner.rs:
##########
@@ -103,8 +107,19 @@ impl DistributedPlanner for DefaultDistributedPlanner {
config: &ConfigOptions,
) -> Result<Vec<Arc<dyn ShuffleWriter>>> {
info!("planning query stages for job {job_id}");
- let (new_plan, mut stages) =
- self.plan_query_stages_internal(job_id, execution_plan, config)?;
+ let broadcast_threshold = config
Review Comment:
can we use `atafusion.optimizer.hash_join_single_partition_threshold` and/or
`datafusion.optimizer.hash_join_single_partition_threshold_rows` instead of
bringing new config value?
##########
ballista/scheduler/src/state/aqe/mod.rs:
##########
@@ -42,6 +42,13 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::vec;
+// TODO: the AQE planner does not yet apply the broadcast-join lowering
Review Comment:
is this still relevant?
##########
ballista/scheduler/src/planner.rs:
##########
@@ -125,17 +140,76 @@ impl DefaultDistributedPlanner {
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
+ broadcast_threshold_bytes: usize,
Review Comment:
we have a config as parameter, broadcast should be part of it
##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -68,20 +68,31 @@ pub struct ShuffleReaderExec {
pub(crate) schema: SchemaRef,
/// Each partition of a shuffle can read data from multiple locations
pub partition: Vec<Vec<PartitionLocation>>,
+ /// When true, every call to `execute(partition)` reads `partition[0]`
+ /// (which holds the flattened concatenation of all upstream partition
+ /// locations) regardless of the partition index. Used for the
+ /// distributed broadcast hash-join lowering.
+ pub broadcast: bool,
Review Comment:
we may need this property in `ExchangeExec`
##########
ballista/scheduler/src/planner.rs:
##########
@@ -125,17 +140,76 @@ impl DefaultDistributedPlanner {
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
+ broadcast_threshold_bytes: usize,
) -> Result<PartialQueryStageResult> {
+ // Apply broadcast-join promotion before recursing.
+ let execution_plan =
+ Self::maybe_promote_to_broadcast(execution_plan,
broadcast_threshold_bytes)?;
+
// recurse down and replace children
if execution_plan.children().is_empty() {
return Ok((execution_plan, vec![]));
}
+ // Broadcast-join lowering: HashJoinExec(CollectLeft) gets its own
Review Comment:
AQE planner has a set of optimizers which we could add or remove, they
capture isolated planning rule, i have a feeling that this should be a
additional rule
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]