wirybeaver commented on code in PR #22988:
URL: https://github.com/apache/datafusion/pull/22988#discussion_r3477731404
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
Review Comment:
Fixed — when an alias is present we now use it as the schema qualifier via
`TableReference::bare(alias.name.value.clone())`, so `t.col` resolves correctly
in `ON` and `WHEN` expressions.
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_table_ref.clone(),
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
+ let plan = LogicalPlan::Dml(DmlStatement::new(
+ target_table_ref,
+ target_table_source,
+ WriteOp::MergeInto(Box::new(MergeIntoOp {
+ on: on_expr,
+ clauses: df_clauses,
+ })),
+ Arc::new(source_plan),
+ ));
+
+ Ok(plan)
+ }
+
+ fn merge_clause_to_plan(
+ &self,
+ clause: ast::MergeClause,
+ combined_schema: &DFSchema,
+ target_schema: &DFSchema,
+ _target_alias: &Option<ast::TableAlias>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<MergeIntoClause> {
+ let kind = match clause.clause_kind {
+ ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched,
+ ast::MergeClauseKind::NotMatched =>
MergeIntoClauseKind::NotMatched,
+ ast::MergeClauseKind::NotMatchedByTarget => {
+ MergeIntoClauseKind::NotMatchedByTarget
+ }
+ ast::MergeClauseKind::NotMatchedBySource => {
+ MergeIntoClauseKind::NotMatchedBySource
+ }
+ };
+
+ let predicate = clause
+ .predicate
+ .map(|p| self.sql_to_expr(p, combined_schema, planner_context))
+ .transpose()?;
+
+ let action = match clause.action {
+ ast::MergeAction::Update(update_expr) => {
+ let assignments = update_expr
+ .assignments
+ .into_iter()
+ .map(|assign| {
+ let col_name = match &assign.target {
+ AssignmentTarget::ColumnName(cols) => cols
+ .0
+ .iter()
+ .last()
+ .ok_or_else(|| plan_datafusion_err!("Empty
column id"))?
+ .as_ident()
+ .unwrap()
+ .value
+ .clone(),
+ _ => plan_err!("Tuples are not supported")?,
+ };
+ // Validate column exists in target
+ target_schema.field_with_unqualified_name(&col_name)?;
+ let value = self.sql_to_expr(
+ assign.value,
+ combined_schema,
+ planner_context,
+ )?;
+ Ok((col_name, value))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ MergeIntoAction::Update(assignments)
+ }
+ ast::MergeAction::Insert(insert_expr) => {
+ let columns: Vec<String> = insert_expr
Review Comment:
Fixed — we now validate before building the action: duplicate column names
are rejected, each named column is checked against the target schema via
`field_with_unqualified_name`, and the value count must equal either the
explicit column count (when a column list is given) or the full target schema
width (when none is given).
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_table_ref.clone(),
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
Review Comment:
Done, returns `Ok(LogicalPlan::Dml(...))` directly now.
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_table_ref.clone(),
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
+ let plan = LogicalPlan::Dml(DmlStatement::new(
+ target_table_ref,
+ target_table_source,
+ WriteOp::MergeInto(Box::new(MergeIntoOp {
+ on: on_expr,
+ clauses: df_clauses,
+ })),
+ Arc::new(source_plan),
+ ));
+
+ Ok(plan)
+ }
+
+ fn merge_clause_to_plan(
+ &self,
+ clause: ast::MergeClause,
+ combined_schema: &DFSchema,
+ target_schema: &DFSchema,
+ _target_alias: &Option<ast::TableAlias>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<MergeIntoClause> {
+ let kind = match clause.clause_kind {
+ ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched,
+ ast::MergeClauseKind::NotMatched =>
MergeIntoClauseKind::NotMatched,
+ ast::MergeClauseKind::NotMatchedByTarget => {
+ MergeIntoClauseKind::NotMatchedByTarget
+ }
+ ast::MergeClauseKind::NotMatchedBySource => {
+ MergeIntoClauseKind::NotMatchedBySource
+ }
+ };
+
+ let predicate = clause
+ .predicate
+ .map(|p| self.sql_to_expr(p, combined_schema, planner_context))
+ .transpose()?;
+
+ let action = match clause.action {
+ ast::MergeAction::Update(update_expr) => {
+ let assignments = update_expr
+ .assignments
+ .into_iter()
+ .map(|assign| {
+ let col_name = match &assign.target {
Review Comment:
Done — extracted `ident_from_object_name_last` as a private associated
function that returns `Result<String>` and errors on non-ident parts. Both
UPDATE and INSERT column extraction now go through it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]