This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 05e00aeb17 [branch-53] Fix DELETE/UPDATE filter extraction when 
predicates are pushed down into TableScan (#19884) (#20898)
05e00aeb17 is described below

commit 05e00aeb17fc17adc3ffea90de8c786b7d8a9673
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:59:10 2026 -0400

    [branch-53] Fix DELETE/UPDATE filter extraction when predicates are pushed 
down into TableScan (#19884) (#20898)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/19840 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/19884 from @kosiew
    to the branch-53 line
    
    Co-authored-by: kosiew <[email protected]>
---
 datafusion/core/src/physical_planner.rs            | 153 +++++++-
 .../tests/custom_sources_cases/dml_planning.rs     | 426 ++++++++++++++++++++-
 datafusion/sql/src/statement.rs                    |  11 +-
 datafusion/sqllogictest/test_files/update.slt      |  63 +--
 4 files changed, 610 insertions(+), 43 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 14f3e5cf03..12406b6c29 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -18,7 +18,7 @@
 //! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
 
 use std::borrow::Cow;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
 use crate::datasource::file_format::file_type_to_format;
@@ -84,7 +84,7 @@ use datafusion_expr::expr::{
 };
 use datafusion_expr::expr_rewriter::unnormalize_cols;
 use 
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
-use datafusion_expr::utils::split_conjunction;
+use datafusion_expr::utils::{expr_to_columns, split_conjunction};
 use datafusion_expr::{
     Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, 
Extension,
     FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, 
StringifiedPlan,
@@ -757,7 +757,7 @@ impl DefaultPhysicalPlanner {
                 if let Some(provider) =
                     target.as_any().downcast_ref::<DefaultTableSource>()
                 {
-                    let filters = extract_dml_filters(input)?;
+                    let filters = extract_dml_filters(input, table_name)?;
                     provider
                         .table_provider
                         .delete_from(session_state, filters)
@@ -783,7 +783,7 @@ impl DefaultPhysicalPlanner {
                 {
                     // For UPDATE, the assignments are encoded in the 
projection of input
                     // We pass the filters and let the provider handle the 
projection
-                    let filters = extract_dml_filters(input)?;
+                    let filters = extract_dml_filters(input, table_name)?;
                     // Extract assignments from the projection in input plan
                     let assignments = extract_update_assignments(input)?;
                     provider
@@ -2067,24 +2067,149 @@ fn get_physical_expr_pair(
 }
 
 /// Extract filter predicates from a DML input plan (DELETE/UPDATE).
-/// Walks the logical plan tree and collects Filter predicates,
-/// splitting AND conjunctions into individual expressions.
-/// Column qualifiers are stripped so expressions can be evaluated against
-/// the TableProvider's schema.
 ///
-fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
+/// Walks the logical plan tree and collects Filter predicates and any filters
+/// pushed down into TableScan nodes, splitting AND conjunctions into 
individual expressions.
+///
+/// For UPDATE...FROM queries involving multiple tables, this function only 
extracts predicates
+/// that reference the target table. Filters from source table scans are 
excluded to prevent
+/// incorrect filter semantics.
+///
+/// Column qualifiers are stripped so expressions can be evaluated against the 
TableProvider's
+/// schema. Deduplication is performed because filters may appear in both 
Filter nodes and
+/// TableScan.filters when the optimizer performs partial (Inexact) filter 
pushdown.
+///
+/// # Parameters
+/// - `input`: The logical plan tree to extract filters from (typically a 
DELETE or UPDATE plan)
+/// - `target`: The target table reference to scope filter extraction 
(prevents multi-table filter leakage)
+///
+/// # Returns
+/// A vector of unqualified filter expressions that can be passed to the 
TableProvider for execution.
+/// Returns an empty vector if no applicable filters are found.
+///
+fn extract_dml_filters(
+    input: &Arc<LogicalPlan>,
+    target: &TableReference,
+) -> Result<Vec<Expr>> {
     let mut filters = Vec::new();
+    let mut allowed_refs = vec![target.clone()];
+
+    // First pass: collect any alias references to the target table
+    input.apply(|node| {
+        if let LogicalPlan::SubqueryAlias(alias) = node
+            // Check if this alias points to the target table
+            && let LogicalPlan::TableScan(scan) = alias.input.as_ref()
+            && scan.table_name.resolved_eq(target)
+        {
+            allowed_refs.push(TableReference::bare(alias.alias.to_string()));
+        }
+        Ok(TreeNodeRecursion::Continue)
+    })?;
 
     input.apply(|node| {
-        if let LogicalPlan::Filter(filter) = node {
-            // Split AND predicates into individual expressions
-            
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
+        match node {
+            LogicalPlan::Filter(filter) => {
+                // Split AND predicates into individual expressions
+                for predicate in split_conjunction(&filter.predicate) {
+                    if predicate_is_on_target_multi(predicate, &allowed_refs)? 
{
+                        filters.push(predicate.clone());
+                    }
+                }
+            }
+            LogicalPlan::TableScan(TableScan {
+                table_name,
+                filters: scan_filters,
+                ..
+            }) => {
+                // Only extract filters from the target table scan.
+                // This prevents incorrect filter extraction in UPDATE...FROM 
scenarios
+                // where multiple table scans may have filters.
+                if table_name.resolved_eq(target) {
+                    for filter in scan_filters {
+                        
filters.extend(split_conjunction(filter).into_iter().cloned());
+                    }
+                }
+            }
+            // Plans without filter information
+            LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Values(_)
+            | LogicalPlan::DescribeTable(_)
+            | LogicalPlan::Explain(_)
+            | LogicalPlan::Analyze(_)
+            | LogicalPlan::Distinct(_)
+            | LogicalPlan::Extension(_)
+            | LogicalPlan::Statement(_)
+            | LogicalPlan::Dml(_)
+            | LogicalPlan::Ddl(_)
+            | LogicalPlan::Copy(_)
+            | LogicalPlan::Unnest(_)
+            | LogicalPlan::RecursiveQuery(_) => {
+                // No filters to extract from leaf/meta plans
+            }
+            // Plans with inputs (may contain filters in children)
+            LogicalPlan::Projection(_)
+            | LogicalPlan::SubqueryAlias(_)
+            | LogicalPlan::Limit(_)
+            | LogicalPlan::Sort(_)
+            | LogicalPlan::Union(_)
+            | LogicalPlan::Join(_)
+            | LogicalPlan::Repartition(_)
+            | LogicalPlan::Aggregate(_)
+            | LogicalPlan::Window(_)
+            | LogicalPlan::Subquery(_) => {
+                // Filter information may appear in child nodes; continue 
traversal
+                // to extract filters from Filter/TableScan nodes deeper in 
the plan
+            }
         }
         Ok(TreeNodeRecursion::Continue)
     })?;
 
-    // Strip table qualifiers from column references
-    filters.into_iter().map(strip_column_qualifiers).collect()
+    // Strip qualifiers and deduplicate. This ensures:
+    // 1. Only target-table predicates are retained from Filter nodes
+    // 2. Qualifiers stripped for TableProvider compatibility
+    // 3. Duplicates removed (from Filter nodes + TableScan.filters)
+    //
+    // Deduplication is necessary because filters may appear in both Filter 
nodes
+    // and TableScan.filters when the optimizer performs partial (Inexact) 
pushdown.
+    let mut seen_filters = HashSet::new();
+    filters
+        .into_iter()
+        .try_fold(Vec::new(), |mut deduped, filter| {
+            let unqualified = strip_column_qualifiers(filter).map_err(|e| {
+                e.context(format!(
+                    "Failed to strip column qualifiers for DML filter on table 
'{target}'"
+                ))
+            })?;
+            if seen_filters.insert(unqualified.clone()) {
+                deduped.push(unqualified);
+            }
+            Ok(deduped)
+        })
+}
+
+/// Determine whether a predicate references only columns from the target table
+/// or its aliases.
+///
+/// Columns may be qualified with the target table name or any of its aliases.
+/// Unqualified columns are also accepted as they implicitly belong to the 
target table.
+fn predicate_is_on_target_multi(
+    expr: &Expr,
+    allowed_refs: &[TableReference],
+) -> Result<bool> {
+    let mut columns = HashSet::new();
+    expr_to_columns(expr, &mut columns)?;
+
+    // Short-circuit on first mismatch: returns false if any column references 
a table not in allowed_refs.
+    // Columns are accepted if:
+    // 1. They are unqualified (no relation specified), OR
+    // 2. Their relation matches one of the allowed table references using 
resolved equality
+    Ok(!columns.iter().any(|column| {
+        column.relation.as_ref().is_some_and(|relation| {
+            !allowed_refs
+                .iter()
+                .any(|allowed| relation.resolved_eq(allowed))
+        })
+    }))
 }
 
 /// Strip table qualifiers from column references in an expression.
diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs 
b/datafusion/core/tests/custom_sources_cases/dml_planning.rs
index c53819ffcc..8c4bae5e98 100644
--- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs
+++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs
@@ -25,9 +25,12 @@ use async_trait::async_trait;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result;
 use datafusion::execution::context::{SessionConfig, SessionContext};
-use datafusion::logical_expr::Expr;
+use datafusion::logical_expr::{
+    Expr, LogicalPlan, TableProviderFilterPushDown, TableScan,
+};
 use datafusion_catalog::Session;
 use datafusion_common::ScalarValue;
+use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
 use datafusion_physical_plan::ExecutionPlan;
 use datafusion_physical_plan::empty::EmptyExec;
 
@@ -35,6 +38,8 @@ use datafusion_physical_plan::empty::EmptyExec;
 struct CaptureDeleteProvider {
     schema: SchemaRef,
     received_filters: Arc<Mutex<Option<Vec<Expr>>>>,
+    filter_pushdown: TableProviderFilterPushDown,
+    per_filter_pushdown: Option<Vec<TableProviderFilterPushDown>>,
 }
 
 impl CaptureDeleteProvider {
@@ -42,6 +47,32 @@ impl CaptureDeleteProvider {
         Self {
             schema,
             received_filters: Arc::new(Mutex::new(None)),
+            filter_pushdown: TableProviderFilterPushDown::Unsupported,
+            per_filter_pushdown: None,
+        }
+    }
+
+    fn new_with_filter_pushdown(
+        schema: SchemaRef,
+        filter_pushdown: TableProviderFilterPushDown,
+    ) -> Self {
+        Self {
+            schema,
+            received_filters: Arc::new(Mutex::new(None)),
+            filter_pushdown,
+            per_filter_pushdown: None,
+        }
+    }
+
+    fn new_with_per_filter_pushdown(
+        schema: SchemaRef,
+        per_filter_pushdown: Vec<TableProviderFilterPushDown>,
+    ) -> Self {
+        Self {
+            schema,
+            received_filters: Arc::new(Mutex::new(None)),
+            filter_pushdown: TableProviderFilterPushDown::Unsupported,
+            per_filter_pushdown: Some(per_filter_pushdown),
         }
     }
 
@@ -92,6 +123,19 @@ impl TableProvider for CaptureDeleteProvider {
             Field::new("count", DataType::UInt64, false),
         ])))))
     }
+
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> Result<Vec<TableProviderFilterPushDown>> {
+        if let Some(per_filter) = &self.per_filter_pushdown
+            && per_filter.len() == filters.len()
+        {
+            return Ok(per_filter.clone());
+        }
+
+        Ok(vec![self.filter_pushdown.clone(); filters.len()])
+    }
 }
 
 /// A TableProvider that captures filters and assignments passed to update().
@@ -100,6 +144,8 @@ struct CaptureUpdateProvider {
     schema: SchemaRef,
     received_filters: Arc<Mutex<Option<Vec<Expr>>>>,
     received_assignments: Arc<Mutex<Option<Vec<(String, Expr)>>>>,
+    filter_pushdown: TableProviderFilterPushDown,
+    per_filter_pushdown: Option<Vec<TableProviderFilterPushDown>>,
 }
 
 impl CaptureUpdateProvider {
@@ -108,6 +154,21 @@ impl CaptureUpdateProvider {
             schema,
             received_filters: Arc::new(Mutex::new(None)),
             received_assignments: Arc::new(Mutex::new(None)),
+            filter_pushdown: TableProviderFilterPushDown::Unsupported,
+            per_filter_pushdown: None,
+        }
+    }
+
+    fn new_with_filter_pushdown(
+        schema: SchemaRef,
+        filter_pushdown: TableProviderFilterPushDown,
+    ) -> Self {
+        Self {
+            schema,
+            received_filters: Arc::new(Mutex::new(None)),
+            received_assignments: Arc::new(Mutex::new(None)),
+            filter_pushdown,
+            per_filter_pushdown: None,
         }
     }
 
@@ -164,6 +225,19 @@ impl TableProvider for CaptureUpdateProvider {
             Field::new("count", DataType::UInt64, false),
         ])))))
     }
+
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> Result<Vec<TableProviderFilterPushDown>> {
+        if let Some(per_filter) = &self.per_filter_pushdown
+            && per_filter.len() == filters.len()
+        {
+            return Ok(per_filter.clone());
+        }
+
+        Ok(vec![self.filter_pushdown.clone(); filters.len()])
+    }
 }
 
 /// A TableProvider that captures whether truncate() was called.
@@ -307,6 +381,168 @@ async fn test_delete_complex_expr() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn test_delete_filter_pushdown_extracts_table_scan_filters() -> 
Result<()> {
+    let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    let df = ctx.sql("DELETE FROM t WHERE id = 1").await?;
+    let optimized_plan = df.clone().into_optimized_plan()?;
+
+    let mut scan_filters = Vec::new();
+    optimized_plan.apply(|node| {
+        if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+            scan_filters.extend(filters.clone());
+        }
+        Ok(TreeNodeRecursion::Continue)
+    })?;
+
+    assert_eq!(scan_filters.len(), 1);
+    assert!(scan_filters[0].to_string().contains("id"));
+
+    df.collect().await?;
+
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert_eq!(filters.len(), 1);
+    assert!(filters[0].to_string().contains("id"));
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_compound_filters_with_pushdown() -> Result<()> {
+    let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'")
+        .await?
+        .collect()
+        .await?;
+
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    // Should receive both filters, not deduplicate valid separate predicates
+    assert_eq!(
+        filters.len(),
+        2,
+        "compound filters should not be over-suppressed"
+    );
+
+    let filter_strs: Vec<String> = filters.iter().map(|f| 
f.to_string()).collect();
+    assert!(
+        filter_strs.iter().any(|s| s.contains("id")),
+        "should contain id filter"
+    );
+    assert!(
+        filter_strs.iter().any(|s| s.contains("status")),
+        "should contain status filter"
+    );
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_mixed_filter_locations() -> Result<()> {
+    // Test mixed-location filters: some in Filter node, some in 
TableScan.filters
+    // This happens when provider uses TableProviderFilterPushDown::Inexact,
+    // meaning it can push down some predicates but not others.
+    let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Inexact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    // Execute DELETE with compound WHERE clause
+    ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'")
+        .await?
+        .collect()
+        .await?;
+
+    // Verify that both predicates are extracted and passed to delete_from(),
+    // even though they may be split between Filter node and TableScan.filters
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert_eq!(
+        filters.len(),
+        2,
+        "should extract both predicates (union of Filter and 
TableScan.filters)"
+    );
+
+    let filter_strs: Vec<String> = filters.iter().map(|f| 
f.to_string()).collect();
+    assert!(
+        filter_strs.iter().any(|s| s.contains("id")),
+        "should contain id filter"
+    );
+    assert!(
+        filter_strs.iter().any(|s| s.contains("status")),
+        "should contain status filter"
+    );
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_per_filter_pushdown_mixed_locations() -> Result<()> {
+    // Force per-filter pushdown decisions to exercise mixed locations in one 
query.
+    // First predicate is pushed down (Exact), second stays as residual 
(Unsupported).
+    let provider = 
Arc::new(CaptureDeleteProvider::new_with_per_filter_pushdown(
+        test_schema(),
+        vec![
+            TableProviderFilterPushDown::Exact,
+            TableProviderFilterPushDown::Unsupported,
+        ],
+    ));
+
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    let df = ctx
+        .sql("DELETE FROM t WHERE id = 1 AND status = 'active'")
+        .await?;
+    let optimized_plan = df.clone().into_optimized_plan()?;
+
+    // Only the first predicate should be pushed to TableScan.filters.
+    let mut scan_filters = Vec::new();
+    optimized_plan.apply(|node| {
+        if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+            scan_filters.extend(filters.clone());
+        }
+        Ok(TreeNodeRecursion::Continue)
+    })?;
+    assert_eq!(scan_filters.len(), 1);
+    assert!(scan_filters[0].to_string().contains("id"));
+
+    // Both predicates should still reach the provider (union + dedup 
behavior).
+    df.collect().await?;
+
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert_eq!(filters.len(), 2);
+
+    let filter_strs: Vec<String> = filters.iter().map(|f| 
f.to_string()).collect();
+    assert!(
+        filter_strs.iter().any(|s| s.contains("id")),
+        "should contain pushed-down id filter"
+    );
+    assert!(
+        filter_strs.iter().any(|s| s.contains("status")),
+        "should contain residual status filter"
+    );
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn test_update_assignments() -> Result<()> {
     let provider = Arc::new(CaptureUpdateProvider::new(test_schema()));
@@ -330,6 +566,80 @@ async fn test_update_assignments() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn test_update_filter_pushdown_extracts_table_scan_filters() -> 
Result<()> {
+    let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    let df = ctx.sql("UPDATE t SET value = 100 WHERE id = 1").await?;
+    let optimized_plan = df.clone().into_optimized_plan()?;
+
+    // Verify that the optimizer pushed down the filter into TableScan
+    let mut scan_filters = Vec::new();
+    optimized_plan.apply(|node| {
+        if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+            scan_filters.extend(filters.clone());
+        }
+        Ok(TreeNodeRecursion::Continue)
+    })?;
+
+    assert_eq!(scan_filters.len(), 1);
+    assert!(scan_filters[0].to_string().contains("id"));
+
+    // Execute the UPDATE and verify filters were extracted and passed to 
update()
+    df.collect().await?;
+
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert_eq!(filters.len(), 1);
+    assert!(filters[0].to_string().contains("id"));
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_update_filter_pushdown_passes_table_scan_filters() -> Result<()> 
{
+    let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    let df = ctx
+        .sql("UPDATE t SET value = 42 WHERE status = 'ready'")
+        .await?;
+    let optimized_plan = df.clone().into_optimized_plan()?;
+
+    let mut scan_filters = Vec::new();
+    optimized_plan.apply(|node| {
+        if let LogicalPlan::TableScan(TableScan { filters, .. }) = node {
+            scan_filters.extend(filters.clone());
+        }
+        Ok(TreeNodeRecursion::Continue)
+    })?;
+
+    assert!(
+        !scan_filters.is_empty(),
+        "expected filter pushdown to populate TableScan filters"
+    );
+
+    df.collect().await?;
+
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert!(
+        !filters.is_empty(),
+        "expected filters extracted from TableScan during UPDATE"
+    );
+    Ok(())
+}
+
 #[tokio::test]
 async fn test_truncate_calls_provider() -> Result<()> {
     let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
@@ -379,6 +689,120 @@ async fn test_unsupported_table_update() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn test_delete_target_table_scoping() -> Result<()> {
+    // Test that DELETE only extracts filters from the target table,
+    // not from other tables (important for DELETE...FROM safety)
+    let target_provider = 
Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table(
+        "target_t",
+        Arc::clone(&target_provider) as Arc<dyn TableProvider>,
+    )?;
+
+    // For now, we test single-table DELETE
+    // and validate that the scoping logic is correct
+    let df = ctx.sql("DELETE FROM target_t WHERE id > 5").await?;
+    df.collect().await?;
+
+    let filters = target_provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert_eq!(filters.len(), 1);
+    assert!(
+        filters[0].to_string().contains("id"),
+        "Filter should be for id column"
+    );
+    assert!(
+        filters[0].to_string().contains("5"),
+        "Filter should contain the value 5"
+    );
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_update_from_drops_non_target_predicates() -> Result<()> {
+    // UPDATE ... FROM is currently not working
+    // TODO fix https://github.com/apache/datafusion/issues/19950
+    let target_provider = 
Arc::new(CaptureUpdateProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t1", Arc::clone(&target_provider) as Arc<dyn 
TableProvider>)?;
+
+    let source_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("status", DataType::Utf8, true),
+        // t2-only column to avoid false negatives after qualifier stripping
+        Field::new("src_only", DataType::Utf8, true),
+    ]));
+    let source_table = 
datafusion::datasource::empty::EmptyTable::new(source_schema);
+    ctx.register_table("t2", Arc::new(source_table))?;
+
+    let result = ctx
+        .sql(
+            "UPDATE t1 SET value = 1 FROM t2 \
+             WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10",
+        )
+        .await;
+
+    // Verify UPDATE ... FROM is rejected with appropriate error
+    // TODO fix https://github.com/apache/datafusion/issues/19950
+    assert!(result.is_err());
+    let err = result.unwrap_err();
+    assert!(
+        err.to_string().contains("UPDATE ... FROM is not supported"),
+        "Expected 'UPDATE ... FROM is not supported' error, got: {err}"
+    );
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_delete_qualifier_stripping_and_validation() -> Result<()> {
+    // Test that filter qualifiers are properly stripped and validated
+    // Unqualified predicates should work fine
+    let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown(
+        test_schema(),
+        TableProviderFilterPushDown::Exact,
+    ));
+    let ctx = SessionContext::new();
+    ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
+
+    // Execute DELETE with unqualified column reference
+    // (After parsing, the planner adds qualifiers, but our validation should 
accept them)
+    let df = ctx.sql("DELETE FROM t WHERE id = 1").await?;
+    df.collect().await?;
+
+    let filters = provider
+        .captured_filters()
+        .expect("filters should be captured");
+    assert!(!filters.is_empty(), "Should have extracted filter");
+
+    // Verify qualifiers are stripped: check that Column expressions have no 
qualifier
+    let has_qualified_column = filters[0]
+        .exists(|expr| Ok(matches!(expr, Expr::Column(col) if 
col.relation.is_some())))?;
+    assert!(
+        !has_qualified_column,
+        "Filter should have unqualified columns after stripping"
+    );
+
+    // Also verify the string representation doesn't contain table qualifiers
+    let filter_str = filters[0].to_string();
+    assert!(
+        !filter_str.contains("t.id"),
+        "Filter should not contain qualified column reference, got: 
{filter_str}"
+    );
+    assert!(
+        filter_str.contains("id") || filter_str.contains("1"),
+        "Filter should reference id column or the value 1, got: {filter_str}"
+    );
+    Ok(())
+}
+
 #[tokio::test]
 async fn test_unsupported_table_truncate() -> Result<()> {
     let schema = test_schema();
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 32bc8cb244..b91e38e537 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1078,9 +1078,18 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                     });
                 // TODO: support multiple tables in UPDATE SET FROM
                 if from_clauses.as_ref().is_some_and(|f| f.len() > 1) {
-                    plan_err!("Multiple tables in UPDATE SET FROM not yet 
supported")?;
+                    not_impl_err!(
+                        "Multiple tables in UPDATE SET FROM not yet supported"
+                    )?;
                 }
                 let update_from = from_clauses.and_then(|mut f| f.pop());
+
+                // UPDATE ... FROM is currently not working
+                // TODO fix https://github.com/apache/datafusion/issues/19950
+                if update_from.is_some() {
+                    return not_impl_err!("UPDATE ... FROM is not supported");
+                }
+
                 if returning.is_some() {
                     plan_err!("Update-returning clause not yet supported")?;
                 }
diff --git a/datafusion/sqllogictest/test_files/update.slt 
b/datafusion/sqllogictest/test_files/update.slt
index a652ae7633..1cd2b626e3 100644
--- a/datafusion/sqllogictest/test_files/update.slt
+++ b/datafusion/sqllogictest/test_files/update.slt
@@ -67,39 +67,48 @@ logical_plan
 physical_plan_error This feature is not implemented: Physical plan does not 
support logical expression ScalarSubquery(<subquery>)
 
 # set from other table
-query TT
+# UPDATE ... FROM is currently unsupported
+# TODO fix https://github.com/apache/datafusion/issues/19950
+query error DataFusion error: This feature is not implemented: UPDATE ... FROM 
is not supported
 explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and 
t1.b > 'foo' and t2.c > 1.0;
-----
-logical_plan
-01)Dml: op=[Update] table=[t1]
-02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, 
CAST(Int64(1) AS Int32) AS d
-03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > 
Float64(1)
-04)------Cross Join:
-05)--------TableScan: t1
-06)--------TableScan: t2
-physical_plan
-01)CooperativeExec
-02)--DmlResultExec: rows_affected=0
 
+# test update from other table with actual data
 statement ok
-create table t3(a int, b varchar, c double, d int);
+insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 
4.0, 30);
+
+statement ok
+insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), 
(4, 'updated_b3', 1.5, 60);
+
+# UPDATE ... FROM is currently unsupported - qualifier stripping breaks source 
column references
+# causing assignments like 'b = t2.b' to resolve to target table's 'b' instead 
of source table's 'b'
+# TODO fix https://github.com/apache/datafusion/issues/19950
+statement error DataFusion error: This feature is not implemented: UPDATE ... 
FROM is not supported
+update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 
'foo' and t2.c > 1.0;
 
 # set from multiple tables, DataFusion only supports from one table
-query error DataFusion error: Error during planning: Multiple tables in UPDATE 
SET FROM not yet supported
+statement error DataFusion error: This feature is not implemented: Multiple 
tables in UPDATE SET FROM not yet supported
 explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a 
and t1.a = t3.a;
 
 # test table alias
-query TT
+# UPDATE ... FROM is currently unsupported
+# TODO fix https://github.com/apache/datafusion/issues/19950
+statement error DataFusion error: This feature is not implemented: UPDATE ... 
FROM is not supported
 explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a 
and t.b > 'foo' and t2.c > 1.0;
-----
-logical_plan
-01)Dml: op=[Update] table=[t1]
-02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) 
AS Int32) AS d
-03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > 
Float64(1)
-04)------Cross Join:
-05)--------SubqueryAlias: t
-06)----------TableScan: t1
-07)--------TableScan: t2
-physical_plan
-01)CooperativeExec
-02)--DmlResultExec: rows_affected=0
+
+# test update with table alias with actual data
+statement ok
+delete from t1;
+
+statement ok
+delete from t2;
+
+statement ok
+insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple', 
3.5, 15);
+
+statement ok
+insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200);
+
+# UPDATE ... FROM is currently unsupported
+# TODO fix https://github.com/apache/datafusion/issues/19950
+statement error DataFusion error: This feature is not implemented: UPDATE ... 
FROM is not supported
+update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 
'foo' and t2.c > 1.0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to