milenkovicm commented on code in PR #1647:
URL: 
https://github.com/apache/datafusion-ballista/pull/1647#discussion_r3178515741


##########
ballista/scheduler/src/planner.rs:
##########
@@ -216,6 +290,143 @@ impl DefaultDistributedPlanner {
         self.next_stage_id += 1;
         self.next_stage_id
     }
+
+    /// If `plan` is a `HashJoinExec(Partitioned)` whose smaller side fits
+    /// under the broadcast threshold, returns a rewritten
+    /// `HashJoinExec(CollectLeft)` (with a swap if the small side was on
+    /// the right) wrapped so the build subtree is a single-partition input.
+    /// Otherwise returns the input unchanged.
+    fn maybe_promote_to_broadcast(

Review Comment:
   this should be captured with current datafusion rules, should it? Also 
current planners will do join side swap 



##########
ballista/scheduler/src/planner.rs:
##########
@@ -103,8 +107,19 @@ impl DistributedPlanner for DefaultDistributedPlanner {
         config: &ConfigOptions,
     ) -> Result<Vec<Arc<dyn ShuffleWriter>>> {
         info!("planning query stages for job {job_id}");
-        let (new_plan, mut stages) =
-            self.plan_query_stages_internal(job_id, execution_plan, config)?;
+        let broadcast_threshold = config

Review Comment:
   can we use `atafusion.optimizer.hash_join_single_partition_threshold` and/or 
`datafusion.optimizer.hash_join_single_partition_threshold_rows` instead of 
bringing new config value? 



##########
ballista/scheduler/src/state/aqe/mod.rs:
##########
@@ -42,6 +42,13 @@ use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 use std::vec;
 
+// TODO: the AQE planner does not yet apply the broadcast-join lowering

Review Comment:
   is this still relevant? 



##########
ballista/scheduler/src/planner.rs:
##########
@@ -125,17 +140,76 @@ impl DefaultDistributedPlanner {
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
+        broadcast_threshold_bytes: usize,

Review Comment:
   we have a config as parameter, broadcast should be part of it 



##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -68,20 +68,31 @@ pub struct ShuffleReaderExec {
     pub(crate) schema: SchemaRef,
     /// Each partition of a shuffle can read data from multiple locations
     pub partition: Vec<Vec<PartitionLocation>>,
+    /// When true, every call to `execute(partition)` reads `partition[0]`
+    /// (which holds the flattened concatenation of all upstream partition
+    /// locations) regardless of the partition index. Used for the
+    /// distributed broadcast hash-join lowering.
+    pub broadcast: bool,

Review Comment:
   we may need this property in `ExchangeExec`



##########
ballista/scheduler/src/planner.rs:
##########
@@ -125,17 +140,76 @@ impl DefaultDistributedPlanner {
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
+        broadcast_threshold_bytes: usize,
     ) -> Result<PartialQueryStageResult> {
+        // Apply broadcast-join promotion before recursing.
+        let execution_plan =
+            Self::maybe_promote_to_broadcast(execution_plan, 
broadcast_threshold_bytes)?;
+
         // recurse down and replace children
         if execution_plan.children().is_empty() {
             return Ok((execution_plan, vec![]));
         }
 
+        // Broadcast-join lowering: HashJoinExec(CollectLeft) gets its own

Review Comment:
   AQE planner has a set of optimizers which we could add or remove, they 
capture isolated planning rule, i have a feeling that this should be a 
additional rule 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to