This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 664099b606 [branch-52] fix: InList Dictionary filter pushdown type
mismatch (#20962) (#20997)
664099b606 is described below
commit 664099b60640097a982e63174a96d8828fe1dc0d
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 18 07:16:23 2026 -0400
[branch-52] fix: InList Dictionary filter pushdown type mismatch (#20962)
(#20997)
- Part of https://github.com/apache/datafusion/issues/20855
- Closes #20997 on branch-52
This PR:
- Backports https://github.com/apache/datafusion/pull/20962 from
@erratic-pattern to the branch-52 line
- Backports the related tests from
https://github.com/apache/datafusion/pull/20960
Co-authored-by: Adam Curtis <[email protected]>
---
.../physical-expr/src/expressions/in_list.rs | 545 ++++++++++++++++++++-
.../test_files/parquet_filter_pushdown.slt | 99 ++++
2 files changed, 641 insertions(+), 3 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs
b/datafusion/physical-expr/src/expressions/in_list.rs
index 5c2f1adcd0..379bd7edf7 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -98,11 +98,18 @@ impl StaticFilter for ArrayStaticFilter {
));
}
+ // Unwrap dictionary-encoded needles when the value type matches
+ // in_array, evaluating against the dictionary values and mapping
+ // back via keys.
downcast_dictionary_array! {
v => {
- let values_contains = self.contains(v.values().as_ref(),
negated)?;
- let result = take(&values_contains, v.keys(), None)?;
- return Ok(downcast_array(result.as_ref()))
+ // Only unwrap when the haystack (in_array) type matches
+ // the dictionary value type
+ if v.values().data_type() == self.in_array.data_type() {
+ let values_contains = self.contains(v.values().as_ref(),
negated)?;
+ let result = take(&values_contains, v.keys(), None)?;
+ return Ok(downcast_array(result.as_ref()));
+ }
}
_ => {}
}
@@ -3507,4 +3514,536 @@ mod tests {
Ok(())
}
+ /// Helper: creates an InListExpr with `static_filter = None`
+ /// to force the column-reference evaluation path.
+ fn make_in_list_with_columns(
+ expr: Arc<dyn PhysicalExpr>,
+ list: Vec<Arc<dyn PhysicalExpr>>,
+ negated: bool,
+ ) -> Arc<InListExpr> {
+ Arc::new(InListExpr::new(expr, list, negated, None))
+ }
+
+ #[test]
+ fn test_in_list_with_columns_int32_scalars() -> Result<()> {
+ // Column-reference path with scalar literals (bypassing static filter)
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(
+ Arc::new(schema),
+ vec![Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(2),
+ Some(3),
+ None,
+ ]))],
+ )?;
+
+ let list = vec![
+ lit(ScalarValue::Int32(Some(1))),
+ lit(ScalarValue::Int32(Some(3))),
+ ];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(true), Some(false), Some(true),
None,])
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
+ // IN list with column references
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3),
None])),
+ Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(99),
+ Some(99),
+ Some(99),
+ ])),
+ Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3),
None])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: 1 IN (1, 99) → true
+ // row 1: 2 IN (99, 99) → false
+ // row 2: 3 IN (99, 3) → true
+ // row 3: NULL IN (99, NULL) → NULL
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(true), Some(false), Some(true),
None,])
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
+ // IN list with Utf8 column references
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(StringArray::from(vec!["x", "y", "z"])),
+ Arc::new(StringArray::from(vec!["x", "x", "z"])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: "x" IN ("x") → true
+ // row 1: "y" IN ("x") → false
+ // row 2: "z" IN ("z") → true
+ assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_with_columns_negated() -> Result<()> {
+ // NOT IN with column references
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(Int32Array::from(vec![1, 99, 3])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, true);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: 1 NOT IN (1) → false
+ // row 1: 2 NOT IN (99) → true
+ // row 2: 3 NOT IN (3) → false
+ assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_with_columns_null_in_list() -> Result<()> {
+ // IN list with NULL scalar (column-reference path)
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(
+ Arc::new(schema),
+ vec![Arc::new(Int32Array::from(vec![1, 2]))],
+ )?;
+
+ let list = vec![
+ lit(ScalarValue::Int32(None)),
+ lit(ScalarValue::Int32(Some(1))),
+ ];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: 1 IN (NULL, 1) → true (true OR null = true)
+ // row 1: 2 IN (NULL, 1) → NULL (false OR null = null)
+ assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_with_columns_float_nan() -> Result<()> {
+ // Verify NaN == NaN is true in the column-reference path
+ // (consistent with Arrow's totalOrder semantics)
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Float64, false),
+ Field::new("b", DataType::Float64, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
+ Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: NaN IN (NaN) → true
+ // row 1: 1.0 IN (2.0) → false
+ // row 2: NaN IN (0.0) → false
+ assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
+ Ok(())
+ }
+
+ /// Tests that short-circuit evaluation produces correct results.
+ /// When all rows match after the first list item, remaining items
+ /// should be skipped without affecting correctness.
+ #[test]
+ fn test_in_list_with_columns_short_circuit() -> Result<()> {
+ // a IN (b, c) where b already matches every row of a
+ // The short-circuit should skip evaluating c
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(Int32Array::from(vec![1, 2, 3])), // b == a for all
rows
+ Arc::new(Int32Array::from(vec![99, 99, 99])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
+ Ok(())
+ }
+
+ /// Short-circuit must NOT skip when nulls are present (three-valued
logic).
+ /// Even if all non-null values are true, null rows keep the result as
null.
+ #[test]
+ fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
+ // a IN (b, c) where a has nulls
+ // Even if b matches all non-null rows, result should preserve nulls
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
+ Arc::new(Int32Array::from(vec![1, 2, 3])), // matches non-null
rows
+ Arc::new(Int32Array::from(vec![99, 99, 99])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: 1 IN (1, 99) → true
+ // row 1: NULL IN (2, 99) → NULL
+ // row 2: 3 IN (3, 99) → true
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(true), None, Some(true)])
+ );
+ Ok(())
+ }
+
+ /// Tests the make_comparator + collect_bool fallback path using
+ /// struct column references (nested types don't support arrow_eq).
+ #[test]
+ fn test_in_list_with_columns_struct() -> Result<()> {
+ let struct_fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, false),
+ ]);
+ let struct_dt = DataType::Struct(struct_fields.clone());
+
+ let schema = Schema::new(vec![
+ Field::new("a", struct_dt.clone(), true),
+ Field::new("b", struct_dt.clone(), false),
+ Field::new("c", struct_dt.clone(), false),
+ ]);
+
+ // a: [{1,"a"}, {2,"b"}, NULL, {4,"d"}]
+ // b: [{1,"a"}, {9,"z"}, {3,"c"}, {4,"d"}]
+ // c: [{9,"z"}, {2,"b"}, {9,"z"}, {9,"z"}]
+ let a = Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
+ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+ ],
+ Some(vec![true, true, false, true].into()),
+ ));
+ let b = Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
+ Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
+ ],
+ None,
+ ));
+ let c = Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
+ Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
+ ],
+ None,
+ ));
+
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b,
c])?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: {1,"a"} IN ({1,"a"}, {9,"z"}) → true (matches b)
+ // row 1: {2,"b"} IN ({9,"z"}, {2,"b"}) → true (matches c)
+ // row 2: NULL IN ({3,"c"}, {9,"z"}) → NULL
+ // row 3: {4,"d"} IN ({4,"d"}, {9,"z"}) → true (matches b)
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
+ );
+
+ // Also test NOT IN
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, true);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: {1,"a"} NOT IN ({1,"a"}, {9,"z"}) → false
+ // row 1: {2,"b"} NOT IN ({9,"z"}, {2,"b"}) → false
+ // row 2: NULL NOT IN ({3,"c"}, {9,"z"}) → NULL
+ // row 3: {4,"d"} NOT IN ({4,"d"}, {9,"z"}) → false
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(false), Some(false), None,
Some(false)])
+ );
+ Ok(())
+ }
+
+ // -----------------------------------------------------------------------
+ // Tests for try_new_from_array: evaluates `needle IN in_array`.
+ //
+ // This exercises the code path used by HashJoin dynamic filter pushdown,
+ // where in_array is built directly from the join's build-side arrays.
+ // Unlike try_new (used by SQL IN expressions), which always produces a
+ // non-Dictionary in_array because evaluate_list() flattens Dictionary
+ // scalars, try_new_from_array passes the array directly and can produce
+ // a Dictionary in_array.
+ // -----------------------------------------------------------------------
+
+ fn wrap_in_dict(array: ArrayRef) -> ArrayRef {
+ let keys = Int32Array::from((0..array.len() as
i32).collect::<Vec<_>>());
+ Arc::new(DictionaryArray::new(keys, array))
+ }
+
+ /// Evaluates `needle IN in_array` via try_new_from_array, the same
+ /// path used by HashJoin dynamic filter pushdown (not the SQL literal
+ /// IN path which goes through try_new).
+ fn eval_in_list_from_array(
+ needle: ArrayRef,
+ in_array: ArrayRef,
+ ) -> Result<BooleanArray> {
+ let schema =
+ Schema::new(vec![Field::new("a", needle.data_type().clone(),
false)]);
+ let col_a = col("a", &schema)?;
+ let expr = Arc::new(InListExpr::try_new_from_array(col_a, in_array,
false)?)
+ as Arc<dyn PhysicalExpr>;
+ let batch = RecordBatch::try_new(Arc::new(schema), vec![needle])?;
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ Ok(as_boolean_array(&result).clone())
+ }
+
+ #[test]
+ fn test_in_list_from_array_type_combinations() -> Result<()> {
+ use arrow::compute::cast;
+
+ // All cases: needle[0] and needle[2] match, needle[1] does not.
+ let expected = BooleanArray::from(vec![Some(true), Some(false),
Some(true)]);
+
+ // Base arrays cast to each target type
+ let base_in = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
+ let base_needle = Arc::new(Int64Array::from(vec![1i64, 4, 2])) as
ArrayRef;
+
+ // Test all specializations in instantiate_static_filter
+ let primitive_types = vec![
+ DataType::Int8,
+ DataType::Int16,
+ DataType::Int32,
+ DataType::Int64,
+ DataType::UInt8,
+ DataType::UInt16,
+ DataType::UInt32,
+ DataType::UInt64,
+ DataType::Float32,
+ DataType::Float64,
+ ];
+
+ for dt in &primitive_types {
+ let in_array = cast(&base_in, dt)?;
+ let needle = cast(&base_needle, dt)?;
+
+ // T in_array, T needle
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(Arc::clone(&needle),
Arc::clone(&in_array))?,
+ "same-type failed for {dt:?}"
+ );
+
+ // T in_array, Dict(Int32, T) needle
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(wrap_in_dict(needle), in_array)?,
+ "dict-needle failed for {dt:?}"
+ );
+ }
+
+ // Utf8 (falls through to ArrayStaticFilter)
+ let utf8_in = Arc::new(StringArray::from(vec!["a", "b", "c"])) as
ArrayRef;
+ let utf8_needle = Arc::new(StringArray::from(vec!["a", "d", "b"])) as
ArrayRef;
+
+ // Utf8 in_array, Utf8 needle
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(Arc::clone(&utf8_needle),
Arc::clone(&utf8_in),)?
+ );
+
+ // Utf8 in_array, Dict(Utf8) needle
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(
+ wrap_in_dict(Arc::clone(&utf8_needle)),
+ Arc::clone(&utf8_in),
+ )?
+ );
+
+ // Dict(Utf8) in_array, Dict(Utf8) needle: the #20937 bug
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(
+ wrap_in_dict(Arc::clone(&utf8_needle)),
+ wrap_in_dict(Arc::clone(&utf8_in)),
+ )?
+ );
+
+ // Struct in_array, Struct needle: multi-column join
+ let struct_fields = Fields::from(vec![
+ Field::new("c0", DataType::Utf8, true),
+ Field::new("c1", DataType::Int64, true),
+ ]);
+ let make_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
+ let pairs: Vec<(FieldRef, ArrayRef)> =
+ struct_fields.iter().cloned().zip([c0, c1]).collect();
+ Arc::new(StructArray::from(pairs))
+ };
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(
+ make_struct(
+ Arc::clone(&utf8_needle),
+ Arc::new(Int64Array::from(vec![1, 4, 2])),
+ ),
+ make_struct(
+ Arc::clone(&utf8_in),
+ Arc::new(Int64Array::from(vec![1, 2, 3])),
+ ),
+ )?
+ );
+
+ // Struct with Dict fields: multi-column Dict join
+ let dict_struct_fields = Fields::from(vec![
+ Field::new(
+ "c0",
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8)),
+ true,
+ ),
+ Field::new("c1", DataType::Int64, true),
+ ]);
+ let make_dict_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
+ let pairs: Vec<(FieldRef, ArrayRef)> =
+ dict_struct_fields.iter().cloned().zip([c0, c1]).collect();
+ Arc::new(StructArray::from(pairs))
+ };
+ assert_eq!(
+ expected,
+ eval_in_list_from_array(
+ make_dict_struct(
+ wrap_in_dict(Arc::clone(&utf8_needle)),
+ Arc::new(Int64Array::from(vec![1, 4, 2])),
+ ),
+ make_dict_struct(
+ wrap_in_dict(Arc::clone(&utf8_in)),
+ Arc::new(Int64Array::from(vec![1, 2, 3])),
+ ),
+ )?
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_from_array_type_mismatch_errors() -> Result<()> {
+ // Utf8 needle, Dict(Utf8) in_array
+ let err = eval_in_list_from_array(
+ Arc::new(StringArray::from(vec!["a", "d", "b"])),
+ wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
+ )
+ .unwrap_err()
+ .to_string();
+ assert!(
+ err.contains("Can't compare arrays of different types"),
+ "{err}"
+ );
+
+ // Dict(Utf8) needle, Int64 in_array: specialized Int64StaticFilter
+ // rejects the Utf8 dictionary values at construction time
+ let err = eval_in_list_from_array(
+ wrap_in_dict(Arc::new(StringArray::from(vec!["a", "d", "b"]))),
+ Arc::new(Int64Array::from(vec![1, 2, 3])),
+ )
+ .unwrap_err()
+ .to_string();
+ assert!(err.contains("Failed to downcast"), "{err}");
+
+ // Dict(Int64) needle, Dict(Utf8) in_array: both Dict but different
+ // value types, make_comparator rejects the comparison
+ let err = eval_in_list_from_array(
+ wrap_in_dict(Arc::new(Int64Array::from(vec![1, 4, 2]))),
+ wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
+ )
+ .unwrap_err()
+ .to_string();
+ assert!(
+ err.contains("Can't compare arrays of different types"),
+ "{err}"
+ );
+ Ok(())
+ }
}
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index 5e643273ba..d306f94ae3 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -663,3 +663,102 @@ DROP TABLE o2_parquet_20696;
# Cleanup settings
statement ok
set datafusion.execution.parquet.pushdown_filters = false;
+
+##########
+# Regression test: filter pushdown with Struct columns in schema
+#
+# When a schema has Struct columns, Arrow field indices diverge from Parquet
+# leaf indices (Struct children become separate leaves). A filter on a
+# primitive column *after* a Struct must use the correct Parquet leaf index.
+#
+# Schema:
+# Arrow: col_a=0 struct_col=1 col_b=2
+# Parquet: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
+##########
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+COPY (
+ SELECT
+ column1 as col_a,
+ column2 as struct_col,
+ column3 as col_b
+ FROM VALUES
+ (1, {x: 10, y: 100}, 'aaa'),
+ (2, {x: 20, y: 200}, 'target'),
+ (3, {x: 30, y: 300}, 'zzz')
+) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'
+STORED AS PARQUET;
+
+statement ok
+CREATE EXTERNAL TABLE t_struct_filter
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet';
+
+# Filter on col_b (the primitive column after the struct).
+# Before the fix, this returned 0 rows because the filter read struct_col.y
+# (Parquet leaf 2) instead of col_b (Parquet leaf 3).
+query IT
+SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target';
+----
+2 target
+
+# Clean up
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+statement ok
+DROP TABLE t_struct_filter;
+
+##########
+# Regression test for https://github.com/apache/datafusion/issues/20937
+#
+# Dynamic filter pushdown fails when joining VALUES against
+# Dictionary-encoded Parquet columns. The InListExpr's ArrayStaticFilter
+# unwraps the needle Dictionary but not the stored in_array, causing a
+# make_comparator(Utf8, Dictionary) type mismatch.
+##########
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+set datafusion.execution.parquet.reorder_filters = true;
+
+statement ok
+COPY (
+ SELECT
+ arrow_cast(chr(65 + (row_num % 26)), 'Dictionary(Int32, Utf8)') as tag1,
+ row_num * 1.0 as value
+ FROM (SELECT unnest(range(0, 10000)) as row_num)
+) TO 'test_files/scratch/parquet_filter_pushdown/dict_filter_bug.parquet';
+
+statement ok
+CREATE EXTERNAL TABLE dict_filter_bug
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/dict_filter_bug.parquet';
+
+query TR
+SELECT t.tag1, t.value
+FROM dict_filter_bug t
+JOIN (VALUES ('A'), ('B')) AS v(c1)
+ON t.tag1 = v.c1
+ORDER BY t.tag1, t.value
+LIMIT 4;
+----
+A 0
+A 26
+A 52
+A 78
+
+# Cleanup
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+statement ok
+set datafusion.execution.parquet.reorder_filters = false;
+
+statement ok
+DROP TABLE dict_filter_bug;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]