martin-g commented on code in PR #1778:
URL: 
https://github.com/apache/datafusion-ballista/pull/1778#discussion_r3309708458


##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -523,11 +523,16 @@ pub async fn get_query_stages<
                 };
                 match stage {
                     ExecutionStage::Running(running_stage) => {
-                        summary.stage_plan = if render_tree {
-                            
Some(displayable(running_stage.plan.as_ref()).tree_render().to_string())
-                        } else {
-                            
Some(displayable(running_stage.plan.as_ref()).indent(false).to_string())
-                        };
+                        summary.stage_plan = 
running_stage.stage_metrics.as_deref().map(|m| {
+                            
format_stage_plan_with_metrics(running_stage.plan.as_ref(), m, render_tree)
+                        }).or_else(|| {
+                            // no metrics yet
+                            Some(if render_tree {
+                                
displayable(running_stage.plan.as_ref()).tree_render().to_string()
+                            } else {
+                                
displayable(running_stage.plan.as_ref()).indent(false).to_string()
+                            })
+                        });

Review Comment:
   I think this could be simplified to:
   ```rust
                           summary.stage_plan = 
Some(format_stage_plan_with_metrics(
                               running_stage.plan.as_ref(),
                               
running_stage.stage_metrics.as_deref().unwrap_or(&[]),
                               render_tree,
                           ));
   ```



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks: 
&[Option<TaskSummary>]) -> Option<Percentile
     })
 }
 
+fn format_stage_plan_with_metrics(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    render_tree: bool,
+) -> String {
+    // If it is empty - fall back to render_tree
+    if stage_metrics.is_empty() {
+        return if render_tree {
+            displayable(plan).tree_render().to_string()
+        } else {
+            displayable(plan).indent(false).to_string()
+        };
+    }
+
+    let mut metric_idx = 0;
+    let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+    debug_assert_eq!(
+        metric_idx,
+        stage_metrics.len(),
+        "metric count mismatch: consumed {} but stage_metrics has {}",
+        metric_idx,
+        stage_metrics.len()
+    );
+
+    result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order 
traversal)

Review Comment:
   ```suggestion
   /// It is constructed in the same way it is collected (using pre-order 
traversal)
   ```
   `root then children` is pre-order, no ?



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks: 
&[Option<TaskSummary>]) -> Option<Percentile
     })
 }
 
+fn format_stage_plan_with_metrics(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    render_tree: bool,
+) -> String {
+    // If it is empty - fall back to render_tree
+    if stage_metrics.is_empty() {
+        return if render_tree {
+            displayable(plan).tree_render().to_string()
+        } else {
+            displayable(plan).indent(false).to_string()
+        };
+    }
+
+    let mut metric_idx = 0;
+    let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+    debug_assert_eq!(
+        metric_idx,
+        stage_metrics.len(),
+        "metric count mismatch: consumed {} but stage_metrics has {}",
+        metric_idx,
+        stage_metrics.len()
+    );
+
+    result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order 
traversal)
+///
+/// For reference how the metrics are collected on the executor's side - see 
ballista-core/utils.rs
+fn format_node(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    metric_idx: &mut usize,
+    indent: usize,
+) -> String {
+    let metrics_set = if plan.metrics().is_some() {
+        let m = stage_metrics.get(*metric_idx);
+        *metric_idx += 1;

Review Comment:
   Shouldn't this be incremented even when `plan.metrics().is_none()` ?



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks: 
&[Option<TaskSummary>]) -> Option<Percentile
     })
 }
 
+fn format_stage_plan_with_metrics(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    render_tree: bool,
+) -> String {
+    // If it is empty - fall back to render_tree
+    if stage_metrics.is_empty() {
+        return if render_tree {
+            displayable(plan).tree_render().to_string()
+        } else {
+            displayable(plan).indent(false).to_string()
+        };
+    }
+
+    let mut metric_idx = 0;
+    let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+    debug_assert_eq!(
+        metric_idx,
+        stage_metrics.len(),
+        "metric count mismatch: consumed {} but stage_metrics has {}",
+        metric_idx,
+        stage_metrics.len()
+    );
+
+    result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order 
traversal)
+///
+/// For reference how the metrics are collected on the executor's side - see 
ballista-core/utils.rs
+fn format_node(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    metric_idx: &mut usize,
+    indent: usize,
+) -> String {
+    let metrics_set = if plan.metrics().is_some() {
+        let m = stage_metrics.get(*metric_idx);
+        *metric_idx += 1;
+        m
+    } else {
+        None
+    };
+
+    let metric_str = metrics_set
+        .map(|m| {
+            let aggregated = m.aggregate_by_name();

Review Comment:
   ```suggestion
               let aggregated = 
m.aggregate_by_name().sorted_for_display().timestamps_removed();
   ```
   as at 
https://github.com/apache/datafusion-ballista/blob/7a96c9480632c865c6e387a93dcba8457dca3bc2/ballista/scheduler/src/display.rs#L136-L139



##########
ballista/scheduler/src/api/handlers.rs:
##########
@@ -682,6 +687,85 @@ fn task_duration_percentiles(tasks: 
&[Option<TaskSummary>]) -> Option<Percentile
     })
 }
 
+fn format_stage_plan_with_metrics(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    render_tree: bool,
+) -> String {
+    // If it is empty - fall back to render_tree
+    if stage_metrics.is_empty() {
+        return if render_tree {
+            displayable(plan).tree_render().to_string()
+        } else {
+            displayable(plan).indent(false).to_string()
+        };
+    }
+
+    let mut metric_idx = 0;
+    let result = format_node(plan, stage_metrics, &mut metric_idx, 0);
+
+    debug_assert_eq!(
+        metric_idx,
+        stage_metrics.len(),
+        "metric count mismatch: consumed {} but stage_metrics has {}",
+        metric_idx,
+        stage_metrics.len()
+    );
+
+    result
+}
+
+/// Formatting the node in DFS fashion
+/// It is constructed in the same way it is collected (using in-order 
traversal)
+///
+/// For reference how the metrics are collected on the executor's side - see 
ballista-core/utils.rs
+fn format_node(
+    plan: &dyn ExecutionPlan,
+    stage_metrics: &[MetricsSet],
+    metric_idx: &mut usize,
+    indent: usize,
+) -> String {
+    let metrics_set = if plan.metrics().is_some() {
+        let m = stage_metrics.get(*metric_idx);
+        *metric_idx += 1;
+        m
+    } else {
+        None
+    };
+
+    let metric_str = metrics_set
+        .map(|m| {
+            let aggregated = m.aggregate_by_name();
+            format!(", metrics=[{}]", aggregated)
+        })
+        .unwrap_or_else(|| ", metrics=[]".to_string());
+
+    // Single-node display without children
+    let node_line = {
+        struct SingleNode<'a>(&'a dyn ExecutionPlan);
+        impl std::fmt::Display for SingleNode<'_> {
+            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result 
{
+                self.0.fmt_as(DisplayFormatType::Default, f)
+            }
+        }
+        SingleNode(plan).to_string()
+    };
+
+    let prefix = "  ".repeat(indent);
+    let mut result = format!("{}{}{}\n", prefix, node_line, metric_str);

Review Comment:
   This could be optimized by passing a mutable String at the render start call 
and using `write!()` instead of `format!()`. This will avoid many String 
allocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to