avantgardnerio commented on code in PR #16985:
URL: https://github.com/apache/datafusion/pull/16985#discussion_r3196767389


##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -82,31 +85,94 @@ impl UnnestExec {
         struct_column_indices: Vec<usize>,
         schema: SchemaRef,
         options: UnnestOptions,
-    ) -> Self {
-        let cache = Self::compute_properties(&input, Arc::clone(&schema));
+    ) -> Result<Self> {
+        let cache = Self::compute_properties(
+            &input,
+            &list_column_indices,
+            &struct_column_indices,
+            Arc::clone(&schema),
+        )?;
 
-        UnnestExec {
+        Ok(UnnestExec {
             input,
             schema,
             list_column_indices,
             struct_column_indices,
             options,
             metrics: Default::default(),
             cache,
-        }
+        })
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
+        list_column_indices: &[ListUnnest],
+        struct_column_indices: &[usize],
         schema: SchemaRef,
-    ) -> PlanProperties {
-        PlanProperties::new(
-            EquivalenceProperties::new(schema),
-            input.output_partitioning().to_owned(),
+    ) -> Result<PlanProperties> {
+        // Find out which indices are not unnested, such that they can be 
copied over from the input plan
+        let input_schema = input.schema();
+        let mut unnested_indices = 
BooleanBufferBuilder::new(input_schema.fields().len());
+        unnested_indices.append_n(input_schema.fields().len(), false);
+        for list_unnest in list_column_indices {
+            unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
+        }
+        for struct_unnest in struct_column_indices {
+            unnested_indices.set_bit(*struct_unnest, true)
+        }
+        let unnested_indices = unnested_indices.finish();
+        let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
+            .filter(|idx| !unnested_indices.value(*idx))
+            .collect();
+
+        // Manually build projection mapping from non-unnested input columns 
to their positions in the output
+        let input_schema = input.schema();
+        let projection_mapping: ProjectionMapping = non_unnested_indices
+            .iter()
+            .map(|&input_idx| {
+                // Find what index the input column has in the output schema
+                let input_field = input_schema.field(input_idx);
+                let output_idx = schema
+                    .fields()
+                    .iter()
+                    .position(|output_field| output_field.name() == 
input_field.name())
+                    .ok_or_else(|| {
+                        exec_datafusion_err!(
+                            "Non-unnested column '{}' must exist in output 
schema",

Review Comment:
   If some poor soul on the internet also runs into this: 
   
   ```
   /// Like `transform_up`, but when reconstructing an `UnnestExec` with new 
children,
   /// adds any fields present in the child schema but missing from the output 
schema.
   /// This is needed because DF51's `UnnestExec::new()` validates that all 
non-unnested
   /// input columns exist in the output schema, which breaks bottom-up 
transforms that
   /// add columns to child nodes before the parent schema is updated.
   fn transform_up_unnest_safe<F>(
       node: Arc<dyn ExecutionPlan>,
       f: &mut F,
   ) -> Result<Transformed<Arc<dyn ExecutionPlan>>>
   where
       F: FnMut(Arc<dyn ExecutionPlan>) -> Result<Transformed<Arc<dyn 
ExecutionPlan>>>,
   {
       // First, recursively transform all children
       let mut children = 
node.children().into_iter().cloned().collect::<Vec<_>>();
       let mut any_changed = false;
       for child in children.iter_mut() {
           let result = transform_up_unnest_safe(Arc::clone(child), f)?;
           if result.transformed {
               any_changed = true;
               *child = result.data;
           }
       }
   
       // Reconstruct the node with new children if any changed
       let node = if any_changed {
           if let Some(exec) = node.as_any().downcast_ref::<UnnestExec>() {
               // For UnnestExec, add any new fields from the child to the 
output schema
               // before calling new(), so the validation passes
               let new_input = &children[0];
               let mut fields = exec.schema().fields().to_vec();
               for field in new_input.schema().fields() {
                   if !fields.iter().any(|f| f.name() == field.name()) {
                       fields.push(Arc::clone(field));
                   }
               }
               let new_exec = UnnestExec::new(
                   Arc::clone(new_input),
                   exec.list_column_indices().to_vec(),
                   exec.struct_column_indices().to_vec(),
                   Arc::new(Schema::new(fields)),
                   exec.options().clone(),
               )?;
               Arc::new(new_exec) as Arc<dyn ExecutionPlan>
           } else {
               node.with_new_children(children)?
           }
       } else {
           node
       };
   
       // Now apply the user's closure
       let result = f(node)?;
       // If children changed but the closure didn't transform, still mark as 
changed
       if any_changed && !result.transformed {
           Ok(Transformed::yes(result.data))
       } else {
           Ok(result)
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to