This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new e922af936a [branch-53] correct parquet leaf index mapping when schema
contains struct cols (#20698) (#20884)
e922af936a is described below
commit e922af936a16e12d7883124e7c0f555ad091c008
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 08:49:06 2026 -0400
[branch-53] correct parquet leaf index mapping when schema contains struct
cols (#20698) (#20884)
- Part of https://github.com/apache/datafusion/issues/19692
- Closes https://github.com/apache/datafusion/issues/20695 on branch-53
This PR:
- Backports https://github.com/apache/datafusion/pull/20698 from
@friendlymatthew to the branch-53 line
Co-authored-by: Matthew Kim
<[email protected]>
---
datafusion/datasource-parquet/src/row_filter.rs | 126 +++++++++++++++------
.../test_files/parquet_filter_pushdown.slt | 49 +++++++-
2 files changed, 140 insertions(+), 35 deletions(-)
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index 2924208c5b..62ba53bb87 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -242,10 +242,10 @@ impl FilterCandidateBuilder {
let root_indices: Vec<_> =
required_columns.required_columns.into_iter().collect();
+
let leaf_indices = leaf_indices_for_roots(
&root_indices,
metadata.file_metadata().schema_descr(),
- required_columns.nested,
);
let projected_schema =
Arc::new(self.file_schema.project(&root_indices)?);
@@ -277,8 +277,6 @@ struct PushdownChecker<'schema> {
projected_columns: bool,
/// Indices into the file schema of columns required to evaluate the
expression.
required_columns: Vec<usize>,
- /// Tracks the nested column behavior found during traversal.
- nested_behavior: NestedColumnSupport,
/// Whether nested list columns are supported by the predicate semantics.
allow_list_columns: bool,
/// The Arrow schema of the parquet file.
@@ -291,7 +289,6 @@ impl<'schema> PushdownChecker<'schema> {
non_primitive_columns: false,
projected_columns: false,
required_columns: Vec::new(),
- nested_behavior: NestedColumnSupport::PrimitiveOnly,
allow_list_columns,
file_schema,
}
@@ -324,16 +321,11 @@ impl<'schema> PushdownChecker<'schema> {
/// `None` if the type is supported and pushdown can continue.
fn handle_nested_type(&mut self, data_type: &DataType) ->
Option<TreeNodeRecursion> {
if self.is_nested_type_supported(data_type) {
- // Update to ListsSupported if we haven't encountered unsupported
types yet
- if self.nested_behavior == NestedColumnSupport::PrimitiveOnly {
- self.nested_behavior = NestedColumnSupport::ListsSupported;
- }
None
} else {
// Block pushdown for unsupported nested types:
// - Structs (regardless of predicate support)
// - Lists without supported predicates
- self.nested_behavior = NestedColumnSupport::Unsupported;
self.non_primitive_columns = true;
Some(TreeNodeRecursion::Jump)
}
@@ -368,7 +360,6 @@ impl<'schema> PushdownChecker<'schema> {
self.required_columns.dedup();
PushdownColumns {
required_columns: self.required_columns,
- nested: self.nested_behavior,
}
}
}
@@ -391,21 +382,6 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
///
/// This enum makes explicit the different states a predicate can be in
/// with respect to nested column handling during Parquet decoding.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum NestedColumnSupport {
- /// Expression references only primitive (non-nested) columns.
- /// These can always be pushed down to the Parquet decoder.
- PrimitiveOnly,
- /// Expression references list columns with supported predicates
- /// (e.g., array_has, array_has_all, IS NULL).
- /// These can be pushed down to the Parquet decoder.
- ListsSupported,
- /// Expression references unsupported nested types (e.g., structs)
- /// or list columns without supported predicates.
- /// These cannot be pushed down and must be evaluated after decoding.
- Unsupported,
-}
-
/// Result of checking which columns are required for filter pushdown.
#[derive(Debug)]
struct PushdownColumns {
@@ -413,7 +389,6 @@ struct PushdownColumns {
/// the filter expression. Must be in ascending order for correct schema
/// projection matching.
required_columns: Vec<usize>,
- nested: NestedColumnSupport,
}
/// Checks if a given expression can be pushed down to the parquet decoder.
@@ -437,15 +412,13 @@ fn pushdown_columns(
fn leaf_indices_for_roots(
root_indices: &[usize],
schema_descr: &SchemaDescriptor,
- nested: NestedColumnSupport,
) -> Vec<usize> {
- // For primitive-only columns, root indices ARE the leaf indices
- if nested == NestedColumnSupport::PrimitiveOnly {
- return root_indices.to_vec();
- }
-
- // For List columns, expand to the single leaf column (item field)
- // For Struct columns (unsupported), this would expand to multiple leaves
+ // Always map root (Arrow) indices to Parquet leaf indices via the schema
+ // descriptor. Arrow root indices only equal Parquet leaf indices when the
+ // schema has no group columns (Struct, Map, etc.); when group columns
+ // exist, their children become separate leaves and shift all subsequent
+ // leaf indices.
+ // Struct columns are unsupported.
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
(0..schema_descr.num_columns())
@@ -1088,6 +1061,91 @@ mod test {
.expect("parsing schema")
}
+ /// Regression test: when a schema has Struct columns, Arrow field indices
diverge
+ /// from Parquet leaf indices (Struct children become separate leaves). The
+ /// `PrimitiveOnly` fast-path in `leaf_indices_for_roots` assumes they are
equal,
+ /// so a filter on a primitive column *after* a Struct gets the wrong leaf
index.
+ ///
+ /// Schema:
+ /// Arrow indices: col_a=0 struct_col=1 col_b=2
+ /// Parquet leaves: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
+ ///
+ /// A filter on col_b should project Parquet leaf 3, but the bug causes it
to
+ /// project leaf 2 (struct_col.y).
+ #[test]
+ fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
+ use arrow::array::{Int32Array, StringArray, StructArray};
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("col_a", DataType::Int32, false),
+ Field::new(
+ "struct_col",
+ DataType::Struct(
+ vec![
+ Arc::new(Field::new("x", DataType::Int32, true)),
+ Arc::new(Field::new("y", DataType::Int32, true)),
+ ]
+ .into(),
+ ),
+ true,
+ ),
+ Field::new("col_b", DataType::Utf8, false),
+ ]));
+
+ let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let struct_col = Arc::new(StructArray::from(vec![
+ (
+ Arc::new(Field::new("x", DataType::Int32, true)),
+ Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+ ),
+ (
+ Arc::new(Field::new("y", DataType::Int32, true)),
+ Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
+ ),
+ ]));
+ let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
+
+ let batch =
+ RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col,
col_b])
+ .unwrap();
+
+ let file = NamedTempFile::new().expect("temp file");
+ let mut writer =
+ ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema),
None)
+ .expect("writer");
+ writer.write(&batch).expect("write batch");
+ writer.close().expect("close writer");
+
+ let reader_file = file.reopen().expect("reopen file");
+ let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+ .expect("reader builder");
+ let metadata = builder.metadata().clone();
+ let file_schema = builder.schema().clone();
+
+ // sanity check: 4 Parquet leaves, 3 Arrow fields
+ assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
+ assert_eq!(file_schema.fields().len(), 3);
+
+ // build a filter candidate for `col_b = 'target'` through the public
API
+ let expr = col("col_b").eq(Expr::Literal(
+ ScalarValue::Utf8(Some("target".to_string())),
+ None,
+ ));
+ let expr = logical2physical(&expr, &file_schema);
+
+ let candidate = FilterCandidateBuilder::new(expr, file_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("filter on primitive col_b should be pushable");
+
+ // col_b is Parquet leaf 3 (shifted by struct_col's two children).
+ assert_eq!(
+ candidate.projection.leaf_indices,
+ vec![3],
+ "leaf_indices should be [3] for col_b"
+ );
+ }
+
/// Sanity check that the given expression could be evaluated against the
given schema without any errors.
/// This will fail if the expression references columns that are not in
the schema or if the types of the columns are incompatible, etc.
fn check_expression_can_evaluate_against_schema(
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index ef82bd1391..6c4383f997 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -812,7 +812,6 @@ WHERE h2o_parquet_20696.time >=
'1970-01-01T00:00:00.000000050Z'
72.4 53.4 51
70.4 50.4 50
-
statement ok
set datafusion.execution.parquet.pushdown_filters = true;
@@ -842,3 +841,51 @@ DROP TABLE o2_parquet_20696;
# Cleanup settings
statement ok
set datafusion.execution.parquet.pushdown_filters = false;
+
+##########
+# Regression test: filter pushdown with Struct columns in schema
+#
+# When a schema has Struct columns, Arrow field indices diverge from Parquet
+# leaf indices (Struct children become separate leaves). A filter on a
+# primitive column *after* a Struct must use the correct Parquet leaf index.
+#
+# Schema:
+# Arrow: col_a=0 struct_col=1 col_b=2
+# Parquet: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
+##########
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+COPY (
+ SELECT
+ column1 as col_a,
+ column2 as struct_col,
+ column3 as col_b
+ FROM VALUES
+ (1, {x: 10, y: 100}, 'aaa'),
+ (2, {x: 20, y: 200}, 'target'),
+ (3, {x: 30, y: 300}, 'zzz')
+) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'
+STORED AS PARQUET;
+
+statement ok
+CREATE EXTERNAL TABLE t_struct_filter
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet';
+
+# Filter on col_b (the primitive column after the struct).
+# Before the fix, this returned 0 rows because the filter read struct_col.y
+# (Parquet leaf 2) instead of col_b (Parquet leaf 3).
+query IT
+SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target';
+----
+2 target
+
+# Clean up
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+statement ok
+DROP TABLE t_struct_filter;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]