This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 01df975faf [branch-53] fix: InList Dictionary filter pushdown type 
mismatch (#20962) (#20996)
01df975faf is described below

commit 01df975fafff5c0c8c9b7970dfa2cc1401432594
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 19 10:27:13 2026 -0400

    [branch-53] fix: InList Dictionary filter pushdown type mismatch (#20962) 
(#20996)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes #20996 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20962 from
    @erratic-pattern to the branch-53 line
    - Backports the related tests from
    https://github.com/apache/datafusion/pull/20960
    
    Co-authored-by: Adam Curtis <[email protected]>
---
 .../physical-expr/src/expressions/in_list.rs       | 357 ++++++++++++++++++++-
 .../test_files/parquet_filter_pushdown.slt         |  51 +++
 2 files changed, 405 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/in_list.rs 
b/datafusion/physical-expr/src/expressions/in_list.rs
index 44a6572f53..6c81fcc11c 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -99,11 +99,18 @@ impl StaticFilter for ArrayStaticFilter {
             ));
         }
 
+        // Unwrap dictionary-encoded needles when the value type matches
+        // in_array, evaluating against the dictionary values and mapping
+        // back via keys.
         downcast_dictionary_array! {
             v => {
-                let values_contains = self.contains(v.values().as_ref(), 
negated)?;
-                let result = take(&values_contains, v.keys(), None)?;
-                return Ok(downcast_array(result.as_ref()))
+                // Only unwrap when the haystack (in_array) type matches
+                // the dictionary value type
+                if v.values().data_type() == self.in_array.data_type() {
+                    let values_contains = self.contains(v.values().as_ref(), 
negated)?;
+                    let result = take(&values_contains, v.keys(), None)?;
+                    return Ok(downcast_array(result.as_ref()));
+                }
             }
             _ => {}
         }
@@ -3724,4 +3731,348 @@ mod tests {
         assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
         Ok(())
     }
+    /// Tests that short-circuit evaluation produces correct results.
+    /// When all rows match after the first list item, remaining items
+    /// should be skipped without affecting correctness.
+    #[test]
+    fn test_in_list_with_columns_short_circuit() -> Result<()> {
+        // a IN (b, c) where b already matches every row of a
+        // The short-circuit should skip evaluating c
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int32Array::from(vec![1, 2, 3])), // b == a for all 
rows
+                Arc::new(Int32Array::from(vec![99, 99, 99])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
+        Ok(())
+    }
+
+    /// Short-circuit must NOT skip when nulls are present (three-valued 
logic).
+    /// Even if all non-null values are true, null rows keep the result as 
null.
+    #[test]
+    fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
+        // a IN (b, c) where a has nulls
+        // Even if b matches all non-null rows, result should preserve nulls
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
+                Arc::new(Int32Array::from(vec![1, 2, 3])), // matches non-null 
rows
+                Arc::new(Int32Array::from(vec![99, 99, 99])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: 1 IN (1, 99) → true
+        // row 1: NULL IN (2, 99) → NULL
+        // row 2: 3 IN (3, 99) → true
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(true), None, Some(true)])
+        );
+        Ok(())
+    }
+
+    /// Tests the make_comparator + collect_bool fallback path using
+    /// struct column references (nested types don't support arrow_eq).
+    #[test]
+    fn test_in_list_with_columns_struct() -> Result<()> {
+        let struct_fields = Fields::from(vec![
+            Field::new("x", DataType::Int32, false),
+            Field::new("y", DataType::Utf8, false),
+        ]);
+        let struct_dt = DataType::Struct(struct_fields.clone());
+
+        let schema = Schema::new(vec![
+            Field::new("a", struct_dt.clone(), true),
+            Field::new("b", struct_dt.clone(), false),
+            Field::new("c", struct_dt.clone(), false),
+        ]);
+
+        // a: [{1,"a"}, {2,"b"}, NULL,    {4,"d"}]
+        // b: [{1,"a"}, {9,"z"}, {3,"c"}, {4,"d"}]
+        // c: [{9,"z"}, {2,"b"}, {9,"z"}, {9,"z"}]
+        let a = Arc::new(StructArray::new(
+            struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
+                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            ],
+            Some(vec![true, true, false, true].into()),
+        ));
+        let b = Arc::new(StructArray::new(
+            struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
+                Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
+            ],
+            None,
+        ));
+        let c = Arc::new(StructArray::new(
+            struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
+                Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
+            ],
+            None,
+        ));
+
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, 
c])?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: {1,"a"} IN ({1,"a"}, {9,"z"}) → true  (matches b)
+        // row 1: {2,"b"} IN ({9,"z"}, {2,"b"}) → true  (matches c)
+        // row 2: NULL    IN ({3,"c"}, {9,"z"}) → NULL
+        // row 3: {4,"d"} IN ({4,"d"}, {9,"z"}) → true  (matches b)
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
+        );
+
+        // Also test NOT IN
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, true);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: {1,"a"} NOT IN ({1,"a"}, {9,"z"}) → false
+        // row 1: {2,"b"} NOT IN ({9,"z"}, {2,"b"}) → false
+        // row 2: NULL    NOT IN ({3,"c"}, {9,"z"}) → NULL
+        // row 3: {4,"d"} NOT IN ({4,"d"}, {9,"z"}) → false
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(false), Some(false), None, 
Some(false)])
+        );
+        Ok(())
+    }
+
+    // -----------------------------------------------------------------------
+    // Tests for try_new_from_array: evaluates `needle IN in_array`.
+    //
+    // This exercises the code path used by HashJoin dynamic filter pushdown,
+    // where in_array is built directly from the join's build-side arrays.
+    // Unlike try_new (used by SQL IN expressions), which always produces a
+    // non-Dictionary in_array because evaluate_list() flattens Dictionary
+    // scalars, try_new_from_array passes the array directly and can produce
+    // a Dictionary in_array.
+    // -----------------------------------------------------------------------
+
+    fn wrap_in_dict(array: ArrayRef) -> ArrayRef {
+        let keys = Int32Array::from((0..array.len() as 
i32).collect::<Vec<_>>());
+        Arc::new(DictionaryArray::new(keys, array))
+    }
+
+    /// Evaluates `needle IN in_array` via try_new_from_array, the same
+    /// path used by HashJoin dynamic filter pushdown (not the SQL literal
+    /// IN path which goes through try_new).
+    fn eval_in_list_from_array(
+        needle: ArrayRef,
+        in_array: ArrayRef,
+    ) -> Result<BooleanArray> {
+        let schema =
+            Schema::new(vec![Field::new("a", needle.data_type().clone(), 
false)]);
+        let col_a = col("a", &schema)?;
+        let expr = Arc::new(InListExpr::try_new_from_array(col_a, in_array, 
false)?)
+            as Arc<dyn PhysicalExpr>;
+        let batch = RecordBatch::try_new(Arc::new(schema), vec![needle])?;
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        Ok(as_boolean_array(&result).clone())
+    }
+
+    #[test]
+    fn test_in_list_from_array_type_combinations() -> Result<()> {
+        use arrow::compute::cast;
+
+        // All cases: needle[0] and needle[2] match, needle[1] does not.
+        let expected = BooleanArray::from(vec![Some(true), Some(false), 
Some(true)]);
+
+        // Base arrays cast to each target type
+        let base_in = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
+        let base_needle = Arc::new(Int64Array::from(vec![1i64, 4, 2])) as 
ArrayRef;
+
+        // Test all specializations in instantiate_static_filter
+        let primitive_types = vec![
+            DataType::Int8,
+            DataType::Int16,
+            DataType::Int32,
+            DataType::Int64,
+            DataType::UInt8,
+            DataType::UInt16,
+            DataType::UInt32,
+            DataType::UInt64,
+            DataType::Float32,
+            DataType::Float64,
+        ];
+
+        for dt in &primitive_types {
+            let in_array = cast(&base_in, dt)?;
+            let needle = cast(&base_needle, dt)?;
+
+            // T in_array, T needle
+            assert_eq!(
+                expected,
+                eval_in_list_from_array(Arc::clone(&needle), 
Arc::clone(&in_array))?,
+                "same-type failed for {dt:?}"
+            );
+
+            // T in_array, Dict(Int32, T) needle
+            assert_eq!(
+                expected,
+                eval_in_list_from_array(wrap_in_dict(needle), in_array)?,
+                "dict-needle failed for {dt:?}"
+            );
+        }
+
+        // Utf8 (falls through to ArrayStaticFilter)
+        let utf8_in = Arc::new(StringArray::from(vec!["a", "b", "c"])) as 
ArrayRef;
+        let utf8_needle = Arc::new(StringArray::from(vec!["a", "d", "b"])) as 
ArrayRef;
+
+        // Utf8 in_array, Utf8 needle
+        assert_eq!(
+            expected,
+            eval_in_list_from_array(Arc::clone(&utf8_needle), 
Arc::clone(&utf8_in),)?
+        );
+
+        // Utf8 in_array, Dict(Utf8) needle
+        assert_eq!(
+            expected,
+            eval_in_list_from_array(
+                wrap_in_dict(Arc::clone(&utf8_needle)),
+                Arc::clone(&utf8_in),
+            )?
+        );
+
+        // Dict(Utf8) in_array, Dict(Utf8) needle: the #20937 bug
+        assert_eq!(
+            expected,
+            eval_in_list_from_array(
+                wrap_in_dict(Arc::clone(&utf8_needle)),
+                wrap_in_dict(Arc::clone(&utf8_in)),
+            )?
+        );
+
+        // Struct in_array, Struct needle: multi-column join
+        let struct_fields = Fields::from(vec![
+            Field::new("c0", DataType::Utf8, true),
+            Field::new("c1", DataType::Int64, true),
+        ]);
+        let make_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
+            let pairs: Vec<(FieldRef, ArrayRef)> =
+                struct_fields.iter().cloned().zip([c0, c1]).collect();
+            Arc::new(StructArray::from(pairs))
+        };
+        assert_eq!(
+            expected,
+            eval_in_list_from_array(
+                make_struct(
+                    Arc::clone(&utf8_needle),
+                    Arc::new(Int64Array::from(vec![1, 4, 2])),
+                ),
+                make_struct(
+                    Arc::clone(&utf8_in),
+                    Arc::new(Int64Array::from(vec![1, 2, 3])),
+                ),
+            )?
+        );
+
+        // Struct with Dict fields: multi-column Dict join
+        let dict_struct_fields = Fields::from(vec![
+            Field::new(
+                "c0",
+                DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8)),
+                true,
+            ),
+            Field::new("c1", DataType::Int64, true),
+        ]);
+        let make_dict_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
+            let pairs: Vec<(FieldRef, ArrayRef)> =
+                dict_struct_fields.iter().cloned().zip([c0, c1]).collect();
+            Arc::new(StructArray::from(pairs))
+        };
+        assert_eq!(
+            expected,
+            eval_in_list_from_array(
+                make_dict_struct(
+                    wrap_in_dict(Arc::clone(&utf8_needle)),
+                    Arc::new(Int64Array::from(vec![1, 4, 2])),
+                ),
+                make_dict_struct(
+                    wrap_in_dict(Arc::clone(&utf8_in)),
+                    Arc::new(Int64Array::from(vec![1, 2, 3])),
+                ),
+            )?
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_list_from_array_type_mismatch_errors() -> Result<()> {
+        // Utf8 needle, Dict(Utf8) in_array
+        let err = eval_in_list_from_array(
+            Arc::new(StringArray::from(vec!["a", "d", "b"])),
+            wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
+        )
+        .unwrap_err()
+        .to_string();
+        assert!(
+            err.contains("Can't compare arrays of different types"),
+            "{err}"
+        );
+
+        // Dict(Utf8) needle, Int64 in_array: specialized Int64StaticFilter
+        // rejects the Utf8 dictionary values at construction time
+        let err = eval_in_list_from_array(
+            wrap_in_dict(Arc::new(StringArray::from(vec!["a", "d", "b"]))),
+            Arc::new(Int64Array::from(vec![1, 2, 3])),
+        )
+        .unwrap_err()
+        .to_string();
+        assert!(err.contains("Failed to downcast"), "{err}");
+
+        // Dict(Int64) needle, Dict(Utf8) in_array: both Dict but different
+        // value types, make_comparator rejects the comparison
+        let err = eval_in_list_from_array(
+            wrap_in_dict(Arc::new(Int64Array::from(vec![1, 4, 2]))),
+            wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
+        )
+        .unwrap_err()
+        .to_string();
+        assert!(
+            err.contains("Can't compare arrays of different types"),
+            "{err}"
+        );
+        Ok(())
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index 6c4383f997..85f9549357 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -889,3 +889,54 @@ set datafusion.execution.parquet.pushdown_filters = false;
 
 statement ok
 DROP TABLE t_struct_filter;
+
+##########
+# Regression test for https://github.com/apache/datafusion/issues/20937
+#
+# Dynamic filter pushdown fails when joining VALUES against
+# Dictionary-encoded Parquet columns. The InListExpr's ArrayStaticFilter
+# unwraps the needle Dictionary but not the stored in_array, causing a
+# make_comparator(Utf8, Dictionary) type mismatch.
+##########
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+set datafusion.execution.parquet.reorder_filters = true;
+
+statement ok
+COPY (
+  SELECT
+    arrow_cast(chr(65 + (row_num % 26)), 'Dictionary(Int32, Utf8)') as tag1,
+    row_num * 1.0 as value
+  FROM (SELECT unnest(range(0, 10000)) as row_num)
+) TO 'test_files/scratch/parquet_filter_pushdown/dict_filter_bug.parquet';
+
+statement ok
+CREATE EXTERNAL TABLE dict_filter_bug
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/dict_filter_bug.parquet';
+
+query TR
+SELECT t.tag1, t.value
+FROM dict_filter_bug t
+JOIN (VALUES ('A'), ('B')) AS v(c1)
+ON t.tag1 = v.c1
+ORDER BY t.tag1, t.value
+LIMIT 4;
+----
+A 0
+A 26
+A 52
+A 78
+
+# Cleanup
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+statement ok
+set datafusion.execution.parquet.reorder_filters = false;
+
+statement ok
+DROP TABLE dict_filter_bug;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to