avantgardnerio commented on code in PR #16985:
URL: https://github.com/apache/datafusion/pull/16985#discussion_r3196767389
##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -82,31 +85,94 @@ impl UnnestExec {
struct_column_indices: Vec<usize>,
schema: SchemaRef,
options: UnnestOptions,
- ) -> Self {
- let cache = Self::compute_properties(&input, Arc::clone(&schema));
+ ) -> Result<Self> {
+ let cache = Self::compute_properties(
+ &input,
+ &list_column_indices,
+ &struct_column_indices,
+ Arc::clone(&schema),
+ )?;
- UnnestExec {
+ Ok(UnnestExec {
input,
schema,
list_column_indices,
struct_column_indices,
options,
metrics: Default::default(),
cache,
- }
+ })
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
+ list_column_indices: &[ListUnnest],
+ struct_column_indices: &[usize],
schema: SchemaRef,
- ) -> PlanProperties {
- PlanProperties::new(
- EquivalenceProperties::new(schema),
- input.output_partitioning().to_owned(),
+ ) -> Result<PlanProperties> {
+ // Find out which indices are not unnested, such that they can be
copied over from the input plan
+ let input_schema = input.schema();
+ let mut unnested_indices =
BooleanBufferBuilder::new(input_schema.fields().len());
+ unnested_indices.append_n(input_schema.fields().len(), false);
+ for list_unnest in list_column_indices {
+ unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
+ }
+ for struct_unnest in struct_column_indices {
+ unnested_indices.set_bit(*struct_unnest, true)
+ }
+ let unnested_indices = unnested_indices.finish();
+ let non_unnested_indices: Vec<usize> = (0..input_schema.fields().len())
+ .filter(|idx| !unnested_indices.value(*idx))
+ .collect();
+
+ // Manually build projection mapping from non-unnested input columns
to their positions in the output
+ let input_schema = input.schema();
+ let projection_mapping: ProjectionMapping = non_unnested_indices
+ .iter()
+ .map(|&input_idx| {
+ // Find what index the input column has in the output schema
+ let input_field = input_schema.field(input_idx);
+ let output_idx = schema
+ .fields()
+ .iter()
+ .position(|output_field| output_field.name() ==
input_field.name())
+ .ok_or_else(|| {
+ exec_datafusion_err!(
+ "Non-unnested column '{}' must exist in output
schema",
Review Comment:
If some poor soul on the internet also runs into this:
```
/// Like `transform_up`, but when reconstructing an `UnnestExec` with new
children,
/// adds any fields present in the child schema but missing from the output
schema.
/// This is needed because DF51's `UnnestExec::new()` validates that all
non-unnested
/// input columns exist in the output schema, which breaks bottom-up
transforms that
/// add columns to child nodes before the parent schema is updated.
fn transform_up_unnest_safe<F>(
node: Arc<dyn ExecutionPlan>,
f: &mut F,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>>
where
F: FnMut(Arc<dyn ExecutionPlan>) -> Result<Transformed<Arc<dyn
ExecutionPlan>>>,
{
// First, recursively transform all children
let mut children =
node.children().into_iter().cloned().collect::<Vec<_>>();
let mut any_changed = false;
for child in children.iter_mut() {
let result = transform_up_unnest_safe(Arc::clone(child), f)?;
if result.transformed {
any_changed = true;
*child = result.data;
}
}
// Reconstruct the node with new children if any changed
let node = if any_changed {
if let Some(exec) = node.as_any().downcast_ref::<UnnestExec>() {
// For UnnestExec, add any new fields from the child to the
output schema
// before calling new(), so the validation passes
let new_input = &children[0];
let mut fields = exec.schema().fields().to_vec();
for field in new_input.schema().fields() {
if !fields.iter().any(|f| f.name() == field.name()) {
fields.push(Arc::clone(field));
}
}
let new_exec = UnnestExec::new(
Arc::clone(new_input),
exec.list_column_indices().to_vec(),
exec.struct_column_indices().to_vec(),
Arc::new(Schema::new(fields)),
exec.options().clone(),
)?;
Arc::new(new_exec) as Arc<dyn ExecutionPlan>
} else {
node.with_new_children(children)?
}
} else {
node
};
// Now apply the user's closure
let result = f(node)?;
// If children changed but the closure didn't transform, still mark as
changed
if any_changed && !result.transformed {
Ok(Transformed::yes(result.data))
} else {
Ok(result)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]