This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new e922af936a [branch-53] correct parquet leaf index mapping when schema 
contains struct cols (#20698) (#20884)
e922af936a is described below

commit e922af936a16e12d7883124e7c0f555ad091c008
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 08:49:06 2026 -0400

    [branch-53] correct parquet leaf index mapping when schema contains struct 
cols (#20698) (#20884)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/20695 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20698 from
    @friendlymatthew to the branch-53 line
    
    Co-authored-by: Matthew Kim 
<[email protected]>
---
 datafusion/datasource-parquet/src/row_filter.rs    | 126 +++++++++++++++------
 .../test_files/parquet_filter_pushdown.slt         |  49 +++++++-
 2 files changed, 140 insertions(+), 35 deletions(-)

diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index 2924208c5b..62ba53bb87 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -242,10 +242,10 @@ impl FilterCandidateBuilder {
 
         let root_indices: Vec<_> =
             required_columns.required_columns.into_iter().collect();
+
         let leaf_indices = leaf_indices_for_roots(
             &root_indices,
             metadata.file_metadata().schema_descr(),
-            required_columns.nested,
         );
 
         let projected_schema = 
Arc::new(self.file_schema.project(&root_indices)?);
@@ -277,8 +277,6 @@ struct PushdownChecker<'schema> {
     projected_columns: bool,
     /// Indices into the file schema of columns required to evaluate the 
expression.
     required_columns: Vec<usize>,
-    /// Tracks the nested column behavior found during traversal.
-    nested_behavior: NestedColumnSupport,
     /// Whether nested list columns are supported by the predicate semantics.
     allow_list_columns: bool,
     /// The Arrow schema of the parquet file.
@@ -291,7 +289,6 @@ impl<'schema> PushdownChecker<'schema> {
             non_primitive_columns: false,
             projected_columns: false,
             required_columns: Vec::new(),
-            nested_behavior: NestedColumnSupport::PrimitiveOnly,
             allow_list_columns,
             file_schema,
         }
@@ -324,16 +321,11 @@ impl<'schema> PushdownChecker<'schema> {
     /// `None` if the type is supported and pushdown can continue.
     fn handle_nested_type(&mut self, data_type: &DataType) -> 
Option<TreeNodeRecursion> {
         if self.is_nested_type_supported(data_type) {
-            // Update to ListsSupported if we haven't encountered unsupported 
types yet
-            if self.nested_behavior == NestedColumnSupport::PrimitiveOnly {
-                self.nested_behavior = NestedColumnSupport::ListsSupported;
-            }
             None
         } else {
             // Block pushdown for unsupported nested types:
             // - Structs (regardless of predicate support)
             // - Lists without supported predicates
-            self.nested_behavior = NestedColumnSupport::Unsupported;
             self.non_primitive_columns = true;
             Some(TreeNodeRecursion::Jump)
         }
@@ -368,7 +360,6 @@ impl<'schema> PushdownChecker<'schema> {
         self.required_columns.dedup();
         PushdownColumns {
             required_columns: self.required_columns,
-            nested: self.nested_behavior,
         }
     }
 }
@@ -391,21 +382,6 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
 ///
 /// This enum makes explicit the different states a predicate can be in
 /// with respect to nested column handling during Parquet decoding.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum NestedColumnSupport {
-    /// Expression references only primitive (non-nested) columns.
-    /// These can always be pushed down to the Parquet decoder.
-    PrimitiveOnly,
-    /// Expression references list columns with supported predicates
-    /// (e.g., array_has, array_has_all, IS NULL).
-    /// These can be pushed down to the Parquet decoder.
-    ListsSupported,
-    /// Expression references unsupported nested types (e.g., structs)
-    /// or list columns without supported predicates.
-    /// These cannot be pushed down and must be evaluated after decoding.
-    Unsupported,
-}
-
 /// Result of checking which columns are required for filter pushdown.
 #[derive(Debug)]
 struct PushdownColumns {
@@ -413,7 +389,6 @@ struct PushdownColumns {
     /// the filter expression. Must be in ascending order for correct schema
     /// projection matching.
     required_columns: Vec<usize>,
-    nested: NestedColumnSupport,
 }
 
 /// Checks if a given expression can be pushed down to the parquet decoder.
@@ -437,15 +412,13 @@ fn pushdown_columns(
 fn leaf_indices_for_roots(
     root_indices: &[usize],
     schema_descr: &SchemaDescriptor,
-    nested: NestedColumnSupport,
 ) -> Vec<usize> {
-    // For primitive-only columns, root indices ARE the leaf indices
-    if nested == NestedColumnSupport::PrimitiveOnly {
-        return root_indices.to_vec();
-    }
-
-    // For List columns, expand to the single leaf column (item field)
-    // For Struct columns (unsupported), this would expand to multiple leaves
+    // Always map root (Arrow) indices to Parquet leaf indices via the schema
+    // descriptor. Arrow root indices only equal Parquet leaf indices when the
+    // schema has no group columns (Struct, Map, etc.); when group columns
+    // exist, their children become separate leaves and shift all subsequent
+    // leaf indices.
+    // Struct columns are unsupported.
     let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
 
     (0..schema_descr.num_columns())
@@ -1088,6 +1061,91 @@ mod test {
             .expect("parsing schema")
     }
 
+    /// Regression test: when a schema has Struct columns, Arrow field indices 
diverge
+    /// from Parquet leaf indices (Struct children become separate leaves). The
+    /// `PrimitiveOnly` fast-path in `leaf_indices_for_roots` assumes they are 
equal,
+    /// so a filter on a primitive column *after* a Struct gets the wrong leaf 
index.
+    ///
+    /// Schema:
+    ///   Arrow indices:   col_a=0  struct_col=1  col_b=2
+    ///   Parquet leaves:  col_a=0  struct_col.x=1  struct_col.y=2  col_b=3
+    ///
+    /// A filter on col_b should project Parquet leaf 3, but the bug causes it 
to
+    /// project leaf 2 (struct_col.y).
+    #[test]
+    fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
+        use arrow::array::{Int32Array, StringArray, StructArray};
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("col_a", DataType::Int32, false),
+            Field::new(
+                "struct_col",
+                DataType::Struct(
+                    vec![
+                        Arc::new(Field::new("x", DataType::Int32, true)),
+                        Arc::new(Field::new("y", DataType::Int32, true)),
+                    ]
+                    .into(),
+                ),
+                true,
+            ),
+            Field::new("col_b", DataType::Utf8, false),
+        ]));
+
+        let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        let struct_col = Arc::new(StructArray::from(vec![
+            (
+                Arc::new(Field::new("x", DataType::Int32, true)),
+                Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
+            ),
+            (
+                Arc::new(Field::new("y", DataType::Int32, true)),
+                Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
+            ),
+        ]));
+        let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
+
+        let batch =
+            RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, 
col_b])
+                .unwrap();
+
+        let file = NamedTempFile::new().expect("temp file");
+        let mut writer =
+            ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), 
None)
+                .expect("writer");
+        writer.write(&batch).expect("write batch");
+        writer.close().expect("close writer");
+
+        let reader_file = file.reopen().expect("reopen file");
+        let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
+            .expect("reader builder");
+        let metadata = builder.metadata().clone();
+        let file_schema = builder.schema().clone();
+
+        // sanity check: 4 Parquet leaves, 3 Arrow fields
+        assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
+        assert_eq!(file_schema.fields().len(), 3);
+
+        // build a filter candidate for `col_b = 'target'` through the public 
API
+        let expr = col("col_b").eq(Expr::Literal(
+            ScalarValue::Utf8(Some("target".to_string())),
+            None,
+        ));
+        let expr = logical2physical(&expr, &file_schema);
+
+        let candidate = FilterCandidateBuilder::new(expr, file_schema)
+            .build(&metadata)
+            .expect("building candidate")
+            .expect("filter on primitive col_b should be pushable");
+
+        // col_b is Parquet leaf 3 (shifted by struct_col's two children).
+        assert_eq!(
+            candidate.projection.leaf_indices,
+            vec![3],
+            "leaf_indices should be [3] for col_b"
+        );
+    }
+
     /// Sanity check that the given expression could be evaluated against the 
given schema without any errors.
     /// This will fail if the expression references columns that are not in 
the schema or if the types of the columns are incompatible, etc.
     fn check_expression_can_evaluate_against_schema(
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index ef82bd1391..6c4383f997 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -812,7 +812,6 @@ WHERE h2o_parquet_20696.time >= 
'1970-01-01T00:00:00.000000050Z'
 72.4 53.4 51
 70.4 50.4 50
 
-
 statement ok
 set datafusion.execution.parquet.pushdown_filters = true;
 
@@ -842,3 +841,51 @@ DROP TABLE o2_parquet_20696;
 # Cleanup settings
 statement ok
 set datafusion.execution.parquet.pushdown_filters = false;
+
+##########
+# Regression test: filter pushdown with Struct columns in schema
+#
+# When a schema has Struct columns, Arrow field indices diverge from Parquet
+# leaf indices (Struct children become separate leaves). A filter on a
+# primitive column *after* a Struct must use the correct Parquet leaf index.
+#
+# Schema:
+#   Arrow:   col_a=0  struct_col=1              col_b=2
+#   Parquet: col_a=0  struct_col.x=1  struct_col.y=2  col_b=3
+##########
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+COPY (
+  SELECT
+    column1 as col_a,
+    column2 as struct_col,
+    column3 as col_b
+  FROM VALUES
+    (1, {x: 10, y: 100}, 'aaa'),
+    (2, {x: 20, y: 200}, 'target'),
+    (3, {x: 30, y: 300}, 'zzz')
+) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'
+STORED AS PARQUET;
+
+statement ok
+CREATE EXTERNAL TABLE t_struct_filter
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet';
+
+# Filter on col_b (the primitive column after the struct).
+# Before the fix, this returned 0 rows because the filter read struct_col.y
+# (Parquet leaf 2) instead of col_b (Parquet leaf 3).
+query IT
+SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target';
+----
+2 target
+
+# Clean up
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+statement ok
+DROP TABLE t_struct_filter;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to