This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1996434b646 [SPARK-40550][SQL] DataSource V2: Handle DELETE commands 
for delta-based sources
1996434b646 is described below

commit 1996434b646040c65c09d9efb7888548900f00fc
Author: aokolnychyi <[email protected]>
AuthorDate: Thu Jan 19 15:20:12 2023 -0800

    [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based 
sources
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for DELETE commands for delta-based sources and 
implements the API added in PR #38004.
    
    Suppose there is a data source capable of encoding deletes using primary 
keys (`pk`). Also, this data source requires knowing the source file, which can 
be projected via a metadata column (`_file`), to encode deletes efficiently.
    
    As an example, there will be a table with 1 file that contains 3 records.
    ```
    pk | salary | department
    ------------------------
    1, 100, hr
    2, 50, software
    3, 150, hardware
    ```
    
    This PR would rewrite `DELETE FROM t WHERE salary <= 100` to perform the 
following steps:
    - find records that need to be removed by scanning the table with the 
delete condition;
    - project required columns to encode deletes (`pk` + `_file` in our case);
    - form a set of changes by adding a new column `__row_operation` column 
with value `delete`;
    - write the set of changes to the table using `WriteDeltaExec` and 
`DeltaWriter`;
    
    The set of changes to encode for the DELETE statement above will look like 
this:
    
    ```
    __row_operation | pk | _file
    ----------------------------
    delete, 1, file_a.parquet
    delete, 2, file_a.parquet
    ```
    
    As opposed to group-based deletes that Spark already supports, the new 
logic will be able to discard records that did not change in the file that had 
matches (i.e. the record with `pk = 3` did not match the condition and was 
discarded).
    
    Then `WriteDeltaExec` will handle this set of changes and translate them 
into `delete()` calls on `DeltaWriter`. In the future, this logic will be 
extended to also cover UPDATEs and MERGEs by adding `update` and `insert` row 
operations to the set of changes supported by `WriteDeltaExec`.
    
    ### Why are the changes needed?
    
    Thes changes are needed as per SPIP SPARK-35801.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    Closes #38005 from aokolnychyi/spark-40550-proto.
    
    Authored-by: aokolnychyi <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 core/src/main/resources/error/error-classes.json   |   6 +
 .../spark/sql/catalyst/ProjectingInternalRow.scala | 117 +++++++++++
 .../catalyst/analysis/RewriteDeleteFromTable.scala |  40 +++-
 .../catalyst/analysis/RewriteRowLevelCommand.scala |  61 +++++-
 .../ReplaceNullWithFalseInPredicate.scala          |   3 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    | 123 ++++++++++-
 .../spark/sql/catalyst/util/RowDeltaUtils.scala    |  28 +++
 .../sql/catalyst/util/WriteDeltaProjections.scala  |  25 +++
 .../spark/sql/errors/QueryCompilationErrors.scala  |   6 +
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  14 +-
 .../catalog/InMemoryRowLevelOperationTable.scala   |  84 +++++++-
 .../datasources/v2/DataSourceV2Strategy.scala      |   5 +
 .../v2/OptimizeMetadataOnlyDeleteFromTable.scala   |   6 +-
 .../sql/execution/datasources/v2/V2Writes.scala    |  34 ++-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 113 +++++++++-
 .../sql/connector/DeleteFromTableSuiteBase.scala   | 234 +++++++++++----------
 .../connector/DeltaBasedDeleteFromTableSuite.scala |  42 ++++
 17 files changed, 798 insertions(+), 143 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 78b1bf0743d..993d1d084e7 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1048,6 +1048,12 @@
     ],
     "sqlState" : "42000"
   },
+  "NULLABLE_ROW_ID_ATTRIBUTES" : {
+    "message" : [
+      "Row ID attributes cannot be nullable: <nullableRowIdAttrs>."
+    ],
+    "sqlState" : "42000"
+  },
   "NULL_MAP_KEY" : {
     "message" : [
       "Cannot use null as map key."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
new file mode 100644
index 00000000000..429ce805bf2
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+/**
+ * An [[InternalRow]] that projects particular columns from another 
[[InternalRow]] without copying
+ * the underlying data.
+ */
+case class ProjectingInternalRow(schema: StructType, colOrdinals: Seq[Int]) 
extends InternalRow {
+  assert(schema.size == colOrdinals.size)
+
+  private var row: InternalRow = _
+
+  override def numFields: Int = colOrdinals.size
+
+  def project(row: InternalRow): Unit = {
+    this.row = row
+  }
+
+  override def setNullAt(i: Int): Unit = {
+    throw new UnsupportedOperationException(s"Cannot modify 
${getClass.getName}")
+  }
+
+  override def update(i: Int, value: Any): Unit = {
+    throw new UnsupportedOperationException(s"Cannot modify 
${getClass.getName}")
+  }
+
+  override def copy(): InternalRow = {
+    val newRow = if (row != null) row.copy() else null
+    val newProjection = ProjectingInternalRow(schema, colOrdinals)
+    newProjection.project(newRow)
+    newProjection
+  }
+
+  override def isNullAt(ordinal: Int): Boolean = {
+    row.isNullAt(colOrdinals(ordinal))
+  }
+
+  override def getBoolean(ordinal: Int): Boolean = {
+    row.getBoolean(colOrdinals(ordinal))
+  }
+
+  override def getByte(ordinal: Int): Byte = {
+    row.getByte(colOrdinals(ordinal))
+  }
+
+  override def getShort(ordinal: Int): Short = {
+    row.getShort(colOrdinals(ordinal))
+  }
+
+  override def getInt(ordinal: Int): Int = {
+    row.getInt(colOrdinals(ordinal))
+  }
+
+  override def getLong(ordinal: Int): Long = {
+    row.getLong(colOrdinals(ordinal))
+  }
+
+  override def getFloat(ordinal: Int): Float = {
+    row.getFloat(colOrdinals(ordinal))
+  }
+
+  override def getDouble(ordinal: Int): Double = {
+    row.getDouble(colOrdinals(ordinal))
+  }
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
{
+    row.getDecimal(colOrdinals(ordinal), precision, scale)
+  }
+
+  override def getUTF8String(ordinal: Int): UTF8String = {
+    row.getUTF8String(colOrdinals(ordinal))
+  }
+
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    row.getBinary(colOrdinals(ordinal))
+  }
+
+  override def getInterval(ordinal: Int): CalendarInterval = {
+    row.getInterval(colOrdinals(ordinal))
+  }
+
+  override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
+    row.getStruct(colOrdinals(ordinal), numFields)
+  }
+
+  override def getArray(ordinal: Int): ArrayData = {
+    row.getArray(colOrdinals(ordinal))
+  }
+
+  override def getMap(ordinal: Int): MapData = {
+    row.getMap(colOrdinals(ordinal))
+  }
+
+  override def get(ordinal: Int, dataType: DataType): AnyRef = {
+    row.get(colOrdinals(ordinal), dataType)
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
index 394e43dce72..b4e077671d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, 
Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, EqualNullSafe, 
Expression, Literal, Not}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, ReplaceData}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project, ReplaceData, WriteDelta}
+import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
 import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, 
SupportsRowLevelOperations, TruncatableTable}
+import org.apache.spark.sql.connector.write.{RowLevelOperationTable, 
SupportsDelta}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
-import org.apache.spark.sql.connector.write.RowLevelOperationTable
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -45,7 +46,12 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand 
{
 
         case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, 
_) =>
           val table = buildOperationTable(t, DELETE, 
CaseInsensitiveStringMap.empty())
-          buildReplaceDataPlan(r, table, cond)
+          table.operation match {
+            case _: SupportsDelta =>
+              buildWriteDeltaPlan(r, table, cond)
+            case _ =>
+              buildReplaceDataPlan(r, table, cond)
+          }
 
         case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
           // don't rewrite as the table supports deletes only with filters
@@ -82,4 +88,30 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand 
{
     val writeRelation = relation.copy(table = operationTable)
     ReplaceData(writeRelation, cond, remainingRowsPlan, relation)
   }
+
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val operation = operationTable.operation.asInstanceOf[SupportsDelta]
+    val rowIdAttrs = resolveRowIdAttrs(relation, operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+
+    // construct a read relation and include all required metadata columns
+    val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+
+    // construct a plan that only contains records to delete
+    val deletedRowsPlan = Filter(cond, readRelation)
+    val operationType = Alias(Literal(DELETE_OPERATION), OPERATION_COLUMN)()
+    val requiredWriteAttrs = dedupAttrs(rowIdAttrs ++ metadataAttrs)
+    val project = Project(operationType +: requiredWriteAttrs, deletedRowsPlan)
+
+    // build a plan to write deletes to the table
+    val writeRelation = relation.copy(table = operationTable)
+    val projections = buildWriteDeltaProjections(project, Nil, rowIdAttrs, 
metadataAttrs)
+    WriteDelta(writeRelation, cond, project, relation, projections)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
index bf8c3e27f4d..1181d85e8be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
@@ -19,13 +19,17 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId, 
V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.ProjectingInternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, ExprId, V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
 import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
-import org.apache.spark.sql.connector.write.{RowLevelOperation, 
RowLevelOperationInfoImpl, RowLevelOperationTable}
+import org.apache.spark.sql.connector.write.{RowLevelOperation, 
RowLevelOperationInfoImpl, RowLevelOperationTable, SupportsDelta}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
@@ -42,9 +46,10 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
   protected def buildRelationWithAttrs(
       relation: DataSourceV2Relation,
       table: RowLevelOperationTable,
-      metadataAttrs: Seq[AttributeReference]): DataSourceV2Relation = {
+      metadataAttrs: Seq[AttributeReference],
+      rowIdAttrs: Seq[AttributeReference] = Nil): DataSourceV2Relation = {
 
-    val attrs = dedupAttrs(relation.output ++ metadataAttrs)
+    val attrs = dedupAttrs(relation.output ++ rowIdAttrs ++ metadataAttrs)
     relation.copy(table = table, output = attrs)
   }
 
@@ -68,4 +73,52 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
       operation.requiredMetadataAttributes,
       relation)
   }
+
+  protected def resolveRowIdAttrs(
+      relation: DataSourceV2Relation,
+      operation: SupportsDelta): Seq[AttributeReference] = {
+
+    val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+      operation.rowId,
+      relation)
+
+    val nullableRowIdAttrs = rowIdAttrs.filter(_.nullable)
+    if (nullableRowIdAttrs.nonEmpty) {
+      throw QueryCompilationErrors.nullableRowIdError(nullableRowIdAttrs)
+    }
+
+    rowIdAttrs
+  }
+
+  protected def buildWriteDeltaProjections(
+      plan: LogicalPlan,
+      rowAttrs: Seq[Attribute],
+      rowIdAttrs: Seq[Attribute],
+      metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {
+
+    val rowProjection = if (rowAttrs.nonEmpty) {
+      Some(newLazyProjection(plan, rowAttrs))
+    } else {
+      None
+    }
+
+    val rowIdProjection = newLazyProjection(plan, rowIdAttrs)
+
+    val metadataProjection = if (metadataAttrs.nonEmpty) {
+      Some(newLazyProjection(plan, metadataAttrs))
+    } else {
+      None
+    }
+
+    WriteDeltaProjections(rowProjection, rowIdProjection, metadataProjection)
+  }
+
+  private def newLazyProjection(
+      plan: LogicalPlan,
+      attrs: Seq[Attribute]): ProjectingInternalRow = {
+
+    val colOrdinals = attrs.map(attr => plan.output.indexWhere(_.exprId == 
attr.exprId))
+    val schema = StructType.fromAttributes(attrs)
+    ProjectingInternalRow(schema, colOrdinals)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index 3ffaca6f54c..9d81ba2fadb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, 
LambdaFunction, Literal, MapFilter, Not, Or}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, 
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, 
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, 
UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, 
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, 
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, 
UpdateTable, WriteDelta}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{INSET, NULL_LITERAL, 
TRUE_OR_FALSE_LITERAL}
 import org.apache.spark.sql.types.BooleanType
@@ -55,6 +55,7 @@ object ReplaceNullWithFalseInPredicate extends 
Rule[LogicalPlan] {
     case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond))
     case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = 
Some(replaceNullWithFalse(cond)))
     case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = 
replaceNullWithFalse(cond))
+    case wd @ WriteDelta(_, cond, _, _, _, _) => wd.copy(condition = 
replaceNullWithFalse(cond))
     case d @ DeleteFromTable(_, cond) => d.copy(condition = 
replaceNullWithFalse(cond))
     case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = 
Some(replaceNullWithFalse(cond)))
     case m: MergeIntoTable =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index eaf74324322..9508b2fb993 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -21,16 +21,16 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, 
EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, 
ResolvedIdentifier, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.FunctionResource
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, MetadataAttribute, 
NamedExpression, Unevaluable, V2ExpressionUtils}
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
 import org.apache.spark.sql.catalyst.trees.BinaryLike
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, RowDeltaUtils, 
WriteDeltaProjections}
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.connector.write.{RowLevelOperation, 
RowLevelOperationTable, Write}
+import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, 
RowLevelOperationTable, SupportsDelta, Write}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, 
StringType, StructType}
+import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, 
MetadataBuilder, StringType, StructField, StructType}
 
 // For v2 DML commands, it may end up with the v1 fallback code path and need 
to build a DataFrame
 // which is required by the DS v1 API. We need to keep the analyzed input 
query plan to build
@@ -274,6 +274,121 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node references a query that translates a logical DELETE, UPDATE, 
MERGE operation into
+ * a set of row-level changes to be encoded in the table. Each row in the 
query represents either
+ * a delete, update or insert and stores the operation type in a special 
column.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE 
operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level 
command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, 
_) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from 
$table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both 
resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && 
metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == 
IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {
+      case Some(projection) =>
+        table.output.size == projection.schema.size &&
+          projection.schema.zip(table.output).forall { case (field, outAttr) =>
+            isCompatible(field, outAttr)
+          }
+      case None =>
+        true
+    })
+  }
+
+  // validates row ID projection output is compatible with row ID attributes
+  private def rowIdAttrsResolved: Boolean = {
+    val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+      operation.rowId,
+      originalTable)
+
+    val projectionSchema = projections.rowIdProjection.schema
+    rowIdAttrs.size == projectionSchema.size && projectionSchema.forall { 
field =>
+      rowIdAttrs.exists(rowIdAttr => isCompatible(field, rowIdAttr))
+    }
+  }
+
+  // validates metadata projection output is compatible with metadata 
attributes
+  private def metadataAttrsResolved: Boolean = {
+    projections.metadataProjection match {
+      case Some(projection) =>
+        val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+          operation.requiredMetadataAttributes,
+          originalTable)
+
+        val projectionSchema = projection.schema
+        metadataAttrs.size == projectionSchema.size && projectionSchema.forall 
{ field =>
+          metadataAttrs.exists(metadataAttr => isCompatible(field, 
metadataAttr))
+        }
+      case None =>
+        true
+    }
+  }
+
+  // checks if a projection field is compatible with a table attribute
+  private def isCompatible(inField: StructField, outAttr: NamedExpression): 
Boolean = {
+    val inType = 
CharVarcharUtils.getRawType(inField.metadata).getOrElse(inField.dataType)
+    val outType = 
CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
+    // names and types must match, nullability must be compatible
+    inField.name == outAttr.name &&
+      DataType.equalsIgnoreCompatibleNullability(inType, outType) &&
+      (outAttr.nullable || !inField.nullable)
+  }
+
+  override def withNewQuery(newQuery: LogicalPlan): V2WriteCommand = 
copy(query = newQuery)
+
+  override def withNewTable(newTable: NamedRelation): V2WriteCommand = 
copy(table = newTable)
+
+  // WriteDelta has no v1 fallback
+  override def storeAnalyzedQuery(): Command = this
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
WriteDelta = {
+    copy(query = newChild)
+  }
+}
+
 /** A trait used for logical plan nodes that create or replace V2 table 
definitions. */
 trait V2CreateTablePlan extends LogicalPlan {
   def tableName: Identifier
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala
new file mode 100644
index 00000000000..57f2a91092e
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+/**
+ * A utility that holds constants for handling deltas of rows.
+ */
+object RowDeltaUtils {
+  final val OPERATION_COLUMN: String = "__row_operation"
+  final val DELETE_OPERATION: Int = 1
+  final val UPDATE_OPERATION: Int = 2
+  final val INSERT_OPERATION: Int = 3
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/WriteDeltaProjections.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/WriteDeltaProjections.scala
new file mode 100644
index 00000000000..90f0be60c53
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/WriteDeltaProjections.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.ProjectingInternalRow
+
+case class WriteDeltaProjections(
+    rowProjection: Option[ProjectingInternalRow],
+    rowIdProjection: ProjectingInternalRow,
+    metadataProjection: Option[ProjectingInternalRow])
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 5fe141e7286..c415fb91c5d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3418,4 +3418,10 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
     SparkException.internalError(
       "The operation `dataType` is not supported.")
   }
+
+  def nullableRowIdError(nullableRowIdAttrs: Seq[AttributeReference]): 
Throwable = {
+    new AnalysisException(
+      errorClass = "NULLABLE_ROW_ID_ATTRIBUTES",
+      messageParameters = Map("nullableRowIdAttrs" -> 
nullableRowIdAttrs.mkString(", ")))
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 12653cae402..28b68a71a47 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -232,6 +232,17 @@ abstract class InMemoryBaseTable(
     dataMap(key).clear()
   }
 
+  def withDeletes(data: Array[BufferedRows]): InMemoryBaseTable = {
+    data.foreach { p =>
+      dataMap ++= dataMap.map { case (key, currentRows) =>
+        val newRows = new BufferedRows(currentRows.key)
+        newRows.rows ++= currentRows.rows.filter(r => 
!p.deletes.contains(r.getInt(0)))
+        key -> newRows
+      }
+    }
+    this
+  }
+
   def withData(data: Array[BufferedRows]): InMemoryBaseTable = {
     withData(data, schema)
   }
@@ -521,6 +532,7 @@ object InMemoryBaseTable {
 class BufferedRows(val key: Seq[Any] = Seq.empty) extends WriterCommitMessage
     with InputPartition with HasPartitionKey with Serializable {
   val rows = new mutable.ArrayBuffer[InternalRow]()
+  val deletes = new mutable.ArrayBuffer[Int]()
 
   def withRow(row: InternalRow): BufferedRows = {
     rows.append(row)
@@ -615,7 +627,7 @@ private object BufferedRowsWriterFactory extends 
DataWriterFactory with Streamin
 }
 
 private class BufferWriter extends DataWriter[InternalRow] {
-  private val buffer = new BufferedRows
+  protected val buffer = new BufferedRows
 
   override def write(row: InternalRow): Unit = buffer.rows.append(row.copy())
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
index 08c22a02b85..4db15efe9eb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.connector.catalog
 
 import java.util
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, 
RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder, 
RowLevelOperationInfo, Write, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, 
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, 
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, 
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, 
SupportsDelta, Write, WriteBuilder, WriterCommitMessage}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -34,17 +35,22 @@ class InMemoryRowLevelOperationTable(
     properties: util.Map[String, String])
   extends InMemoryTable(name, schema, partitioning, properties) with 
SupportsRowLevelOperations {
 
+  private final val PARTITION_COLUMN_REF = 
FieldReference(PartitionKeyColumn.name)
+  private final val SUPPORTS_DELTAS = "supports-deltas"
+
   // used in row-level operation tests to verify replaced partitions
   var replacedPartitions: Seq[Seq[Any]] = Seq.empty
 
   override def newRowLevelOperationBuilder(
       info: RowLevelOperationInfo): RowLevelOperationBuilder = {
-    () => PartitionBasedOperation(info.command)
+    if (properties.getOrDefault(SUPPORTS_DELTAS, "false") == "true") {
+      () => DeltaBasedOperation(info.command)
+    } else {
+      () => PartitionBasedOperation(info.command)
+    }
   }
 
   case class PartitionBasedOperation(command: Command) extends 
RowLevelOperation {
-    private final val PARTITION_COLUMN_REF = 
FieldReference(PartitionKeyColumn.name)
-
     var configuredScan: InMemoryBatchScan = _
 
     override def requiredMetadataAttributes(): Array[NamedReference] = {
@@ -97,4 +103,74 @@ class InMemoryRowLevelOperationTable(
       withData(newData, schema)
     }
   }
+
+  case class DeltaBasedOperation(command: Command) extends RowLevelOperation 
with SupportsDelta {
+    private final val PK_COLUMN_REF = FieldReference("pk")
+
+    override def requiredMetadataAttributes(): Array[NamedReference] = {
+      Array(PARTITION_COLUMN_REF)
+    }
+
+    override def rowId(): Array[NamedReference] = Array(PK_COLUMN_REF)
+
+    override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+      new InMemoryScanBuilder(schema)
+    }
+
+    override def newWriteBuilder(info: LogicalWriteInfo): DeltaWriteBuilder =
+      new DeltaWriteBuilder {
+        override def build(): DeltaWrite = new DeltaWrite with 
RequiresDistributionAndOrdering {
+
+          override def requiredDistribution(): Distribution = {
+            Distributions.clustered(Array(PARTITION_COLUMN_REF))
+          }
+
+          override def requiredOrdering(): Array[SortOrder] = {
+            Array[SortOrder](
+              LogicalExpressions.sort(
+                PARTITION_COLUMN_REF,
+                SortDirection.ASCENDING,
+                SortDirection.ASCENDING.defaultNullOrdering())
+            )
+          }
+
+          override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite
+        }
+      }
+  }
+
+  private object TestDeltaBatchWrite extends DeltaBatchWrite {
+    override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DeltaWriterFactory = {
+      DeltaBufferedRowsWriterFactory
+    }
+
+    override def commit(messages: Array[WriterCommitMessage]): Unit = {
+      withDeletes(messages.map(_.asInstanceOf[BufferedRows]))
+      withData(messages.map(_.asInstanceOf[BufferedRows]))
+    }
+
+    override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  }
+}
+
+private object DeltaBufferedRowsWriterFactory extends DeltaWriterFactory {
+  override def createWriter(partitionId: Int, taskId: Long): 
DeltaWriter[InternalRow] = {
+    new DeltaBufferWriter
+  }
+}
+
+private class DeltaBufferWriter extends BufferWriter with 
DeltaWriter[InternalRow] {
+
+  override def delete(meta: InternalRow, id: InternalRow): Unit = 
buffer.deletes += id.getInt(0)
+
+  override def update(meta: InternalRow, id: InternalRow, row: InternalRow): 
Unit = {
+    buffer.deletes += id.getInt(0)
+    write(row)
+  }
+
+  override def insert(row: InternalRow): Unit = write(row)
+
+  override def write(row: InternalRow): Unit = super[BufferWriter].write(row)
+
+  override def commit(): WriterCommitMessage = buffer
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 1e2a65d9ec2..29f0da1158f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -312,6 +312,11 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       // use the original relation to refresh the cache
       ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
 
+    case WriteDelta(_: DataSourceV2Relation, _, query, r: 
DataSourceV2Relation, projections,
+        Some(write)) =>
+      // use the original relation to refresh the cache
+      WriteDeltaExec(planLater(query), refreshCache(r), projections, write) :: 
Nil
+
     case WriteToContinuousDataSource(writer, query, customMetrics) =>
       WriteToContinuousDataSourceExec(writer, planLater(query), customMetrics) 
:: Nil
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
index 0302928229f..abb9d728c78 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
DeleteFromTableWithFilters, LogicalPlan, ReplaceData, RowLevelWrite}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
DeleteFromTableWithFilters, LogicalPlan, ReplaceData, RowLevelWrite, WriteDelta}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, 
TruncatableTable}
 import org.apache.spark.sql.connector.expressions.filter.Predicate
@@ -77,6 +77,10 @@ object OptimizeMetadataOnlyDeleteFromTable extends 
Rule[LogicalPlan] with Predic
         val command = rd.operation.command
         Some(rd, command, cond, originalTable)
 
+      case wd @ WriteDelta(_, cond, _, originalTable, _, _) =>
+        val command = wd.operation.command
+        Some(wd, command, cond, originalTable)
+
       case _ =>
         None
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
index 5e0ade2474a..8f7fed561c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic, Project, ReplaceData}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic, Project, ReplaceData, 
WriteDelta}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, 
SupportsDynamicOverwrite, SupportsOverwriteV2, SupportsTruncate, Write, 
WriteBuilder}
+import org.apache.spark.sql.connector.write.{DeltaWriteBuilder, 
LogicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwriteV2, 
SupportsTruncate, Write, WriteBuilder}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, 
WriteToMicroBatchDataSource}
 import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
@@ -102,6 +103,11 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
       // project away any metadata columns that could be used for distribution 
and ordering
       rd.copy(write = Some(write), query = Project(rd.dataInput, newQuery))
 
+    case wd @ WriteDelta(r: DataSourceV2Relation, _, query, _, projections, 
None) =>
+      val deltaWriteBuilder = newDeltaWriteBuilder(r.table, Map.empty, 
projections)
+      val deltaWrite = deltaWriteBuilder.build()
+      val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite, 
query, r.funCatalog)
+      wd.copy(write = Some(deltaWrite), query = newQuery)
   }
 
   private def buildWriteForMicroBatch(
@@ -137,4 +143,26 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
     val info = LogicalWriteInfoImpl(queryId, rowSchema, writeOptions.asOptions)
     table.asWritable.newWriteBuilder(info)
   }
+
+  private def newDeltaWriteBuilder(
+      table: Table,
+      writeOptions: Map[String, String],
+      projections: WriteDeltaProjections,
+      queryId: String = UUID.randomUUID().toString): DeltaWriteBuilder = {
+
+    val rowSchema = 
projections.rowProjection.map(_.schema).getOrElse(StructType(Nil))
+    val rowIdSchema = projections.rowIdProjection.schema
+    val metadataSchema = projections.metadataProjection.map(_.schema)
+
+    val info = LogicalWriteInfoImpl(
+      queryId,
+      rowSchema,
+      writeOptions.asOptions,
+      Optional.of(rowIdSchema),
+      Optional.ofNullable(metadataSchema.orNull))
+
+    val writeBuilder = table.asWritable.newWriteBuilder(info)
+    assert(writeBuilder.isInstanceOf[DeltaWriteBuilder], s"$writeBuilder must 
be DeltaWriteBuilder")
+    writeBuilder.asInstanceOf[DeltaWriteBuilder]
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index ee820fd597d..490b7082223 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -21,18 +21,19 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TableSpec, 
UnaryNode}
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, 
WriteDeltaProjections}
+import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, 
INSERT_OPERATION, UPDATE_OPERATION}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
LogicalWriteInfoImpl, PhysicalWriteInfoImpl, V1Write, Write, 
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, LogicalWriteInfoImpl, 
PhysicalWriteInfoImpl, V1Write, Write, WriterCommitMessage}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, 
SQLMetrics}
@@ -298,6 +299,30 @@ case class ReplaceDataExec(
   }
 }
 
+/**
+ * Physical plan node to write a delta of rows to an existing table.
+ */
+case class WriteDeltaExec(
+    query: SparkPlan,
+    refreshCache: () => Unit,
+    projections: WriteDeltaProjections,
+    write: DeltaWrite) extends V2ExistingTableWriteExec {
+
+  override lazy val stringArgs: Iterator[Any] = Iterator(query, write)
+
+  override lazy val writingTask: WritingSparkTask[_] = {
+    if (projections.metadataProjection.isDefined) {
+      DeltaWithMetadataWritingSparkTask(projections)
+    } else {
+      DeltaWritingSparkTask(projections)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
WriteDeltaExec = {
+    copy(query = newChild)
+  }
+}
+
 case class WriteToDataSourceV2Exec(
     batchWrite: BatchWrite,
     refreshCache: () => Unit,
@@ -339,6 +364,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
  */
 trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
   def query: SparkPlan
+  def writingTask: WritingSparkTask[_] = DataWritingSparkTask
 
   var commitProgress: Option[StreamWriterCommitProgress] = None
 
@@ -360,6 +386,8 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
         tempRdd
       }
     }
+    // introduce a local var to avoid serializing the whole class
+    val task = writingTask
     val writerFactory = batchWrite.createBatchWriterFactory(
       PhysicalWriteInfoImpl(rdd.getNumPartitions))
     val useCommitCoordinator = batchWrite.useCommitCoordinator
@@ -376,8 +404,7 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
       sparkContext.runJob(
         rdd,
         (context: TaskContext, iter: Iterator[InternalRow]) =>
-          DataWritingSparkTask.run(writerFactory, context, iter, 
useCommitCoordinator,
-            writeMetrics),
+          task.run(writerFactory, context, iter, useCommitCoordinator, 
writeMetrics),
         rdd.partitions.indices,
         (index, result: DataWritingSparkTaskResult) => {
           val commitMessage = result.writerCommitMessage
@@ -410,7 +437,10 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
   }
 }
 
-object DataWritingSparkTask extends Logging {
+trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with 
Serializable {
+
+  protected def write(writer: W, row: InternalRow): Unit
+
   def run(
       writerFactory: DataWriterFactory,
       context: TaskContext,
@@ -422,7 +452,7 @@ object DataWritingSparkTask extends Logging {
     val partId = context.partitionId()
     val taskId = context.taskAttemptId()
     val attemptId = context.attemptNumber()
-    val dataWriter = writerFactory.createWriter(partId, taskId)
+    val dataWriter = writerFactory.createWriter(partId, taskId).asInstanceOf[W]
 
     var count = 0L
     // write the data and commit this writer.
@@ -434,7 +464,7 @@ object DataWritingSparkTask extends Logging {
 
         // Count is here.
         count += 1
-        dataWriter.write(iter.next())
+        write(dataWriter, iter.next())
       }
 
       CustomMetrics.updateMetrics(dataWriter.currentMetricsValues, 
customMetrics)
@@ -477,6 +507,73 @@ object DataWritingSparkTask extends Logging {
   }
 }
 
+object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] {
+  override protected def write(writer: DataWriter[InternalRow], row: 
InternalRow): Unit = {
+    writer.write(row)
+  }
+}
+
+case class DeltaWritingSparkTask(
+    projections: WriteDeltaProjections) extends 
WritingSparkTask[DeltaWriter[InternalRow]] {
+
+  private lazy val rowProjection = projections.rowProjection.orNull
+  private lazy val rowIdProjection = projections.rowIdProjection
+
+  override protected def write(writer: DeltaWriter[InternalRow], row: 
InternalRow): Unit = {
+    val operation = row.getInt(0)
+
+    operation match {
+      case DELETE_OPERATION =>
+        rowIdProjection.project(row)
+        writer.delete(null, rowIdProjection)
+
+      case UPDATE_OPERATION =>
+        rowProjection.project(row)
+        rowIdProjection.project(row)
+        writer.update(null, rowIdProjection, rowProjection)
+
+      case INSERT_OPERATION =>
+        rowProjection.project(row)
+        writer.insert(rowProjection)
+
+      case other =>
+        throw new SparkException(s"Unexpected operation ID: $other")
+    }
+  }
+}
+
+case class DeltaWithMetadataWritingSparkTask(
+    projections: WriteDeltaProjections) extends 
WritingSparkTask[DeltaWriter[InternalRow]] {
+
+  private lazy val rowProjection = projections.rowProjection.orNull
+  private lazy val rowIdProjection = projections.rowIdProjection
+  private lazy val metadataProjection = projections.metadataProjection.orNull
+
+  override protected def write(writer: DeltaWriter[InternalRow], row: 
InternalRow): Unit = {
+    val operation = row.getInt(0)
+
+    operation match {
+      case DELETE_OPERATION =>
+        rowIdProjection.project(row)
+        metadataProjection.project(row)
+        writer.delete(metadataProjection, rowIdProjection)
+
+      case UPDATE_OPERATION =>
+        rowProjection.project(row)
+        rowIdProjection.project(row)
+        metadataProjection.project(row)
+        writer.update(metadataProjection, rowIdProjection, rowProjection)
+
+      case INSERT_OPERATION =>
+        rowProjection.project(row)
+        writer.insert(rowProjection)
+
+      case other =>
+        throw new SparkException(s"Unexpected operation ID: $other")
+    }
+  }
+}
+
 private[v2] trait TableWriteExecHelper extends V2TableWriteExec with 
SupportsV1Write {
   protected def writeToTable(
       catalog: TableCatalog,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index d9a12b47ec2..14b951e66db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryRowLevelOpera
 import org.apache.spark.sql.connector.expressions.LogicalExpressions._
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, 
ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, 
ReplaceDataExec, WriteDeltaExec}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.QueryExecutionListener
@@ -50,6 +51,10 @@ abstract class DeleteFromTableSuiteBase
   protected val ident: Identifier = Identifier.of(namespace, "test_table")
   protected val tableNameAsString: String = "cat." + ident.toString
 
+  protected def extraTableProps: java.util.Map[String, String] = {
+    Collections.emptyMap[String, String]
+  }
+
   protected def catalog: InMemoryRowLevelOperationTableCatalog = {
     val catalog = spark.sessionState.catalogManager.catalog("cat")
     catalog.asTableCatalog.asInstanceOf[InMemoryRowLevelOperationTableCatalog]
@@ -70,7 +75,7 @@ abstract class DeleteFromTableSuiteBase
   }
 
   test("delete from empty tables") {
-    createTable("id INT, dep STRING")
+    createTable("pk INT NOT NULL, id INT, dep STRING")
 
     sql(s"DELETE FROM $tableNameAsString WHERE id <= 1")
 
@@ -78,76 +83,76 @@ abstract class DeleteFromTableSuiteBase
   }
 
   test("delete with basic filters") {
-    createAndInitTable("id INT, dep STRING",
-      """{ "id": 1, "dep": "hr" }
-        |{ "id": 2, "dep": "software" }
-        |{ "id": 3, "dep": "hr" }
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
         |""".stripMargin)
 
     sql(s"DELETE FROM $tableNameAsString WHERE id <= 1")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(2, "software") :: Row(3, "hr") :: Nil)
+      Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)
   }
 
   test("delete with aliases") {
-    createAndInitTable("id INT, dep STRING",
-      """{ "id": 1, "dep": "hr" }
-        |{ "id": 2, "dep": "software" }
-        |{ "id": 3, "dep": "hr" }
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
         |""".stripMargin)
 
     sql(s"DELETE FROM $tableNameAsString AS t WHERE t.id <= 1 OR t.dep = 'hr'")
 
-    checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, "software") 
:: Nil)
+    checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 
"software") :: Nil)
   }
 
   test("delete with IN predicates") {
-    createAndInitTable("id INT, dep STRING",
-      """{ "id": 1, "dep": "hr" }
-        |{ "id": 2, "dep": "software" }
-        |{ "id": null, "dep": "hr" }
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": null, "dep": "hr" }
         |""".stripMargin)
 
     sql(s"DELETE FROM $tableNameAsString WHERE id IN (1, null)")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(2, "software") :: Row(null, "hr") :: Nil)
+      Row(2, 2, "software") :: Row(3, null, "hr") :: Nil)
   }
 
   test("delete with NOT IN predicates") {
-    createAndInitTable("id INT, dep STRING",
-      """{ "id": 1, "dep": "hr" }
-        |{ "id": 2, "dep": "software" }
-        |{ "id": null, "dep": "hr" }
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": null, "dep": "hr" }
         |""".stripMargin)
 
     sql(s"DELETE FROM $tableNameAsString WHERE id NOT IN (null, 1)")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(1, "hr") :: Row(2, "software") :: Row(null, "hr") :: Nil)
+      Row(1, 1, "hr") :: Row(2, 2, "software") :: Row(3, null, "hr") :: Nil)
 
     sql(s"DELETE FROM $tableNameAsString WHERE id NOT IN (1, 10)")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(1, "hr") :: Row(null, "hr") :: Nil)
+      Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil)
   }
 
   test("delete with conditions on nested columns") {
-    createAndInitTable("id INT, complex STRUCT<c1:INT,c2:STRING>, dep STRING",
-      """{ "id": 1, "complex": { "c1": 3, "c2": "v1" }, "dep": "hr" }
-        |{ "id": 2, "complex": { "c1": 2, "c2": "v2" }, "dep": "software" }
+    createAndInitTable("pk INT NOT NULL, id INT, complex 
STRUCT<c1:INT,c2:STRING>, dep STRING",
+      """{ "pk": 1, "id": 1, "complex": { "c1": 3, "c2": "v1" }, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "complex": { "c1": 2, "c2": "v2" }, "dep": 
"software" }
         |""".stripMargin)
 
     sql(s"DELETE FROM $tableNameAsString WHERE complex.c1 = id + 2")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(2, Row(2, "v2"), "software") :: Nil)
+      Row(2, 2, Row(2, "v2"), "software") :: Nil)
 
     sql(s"DELETE FROM $tableNameAsString t WHERE t.complex.c1 = id")
 
@@ -156,10 +161,10 @@ abstract class DeleteFromTableSuiteBase
 
   test("delete with IN subqueries") {
     withTempView("deleted_id", "deleted_dep") {
-      createAndInitTable("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hardware" }
+          |{ "pk": 3, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       val deletedIdDF = Seq(Some(0), Some(1), None).toDF()
@@ -178,16 +183,16 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, "hardware") :: Row(null, "hr") :: Nil)
+        Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
 
-      append("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": -1, "dep": "hr" }
+      append("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 4, "id": 1, "dep": "hr" }
+          |{ "pk": 5, "id": -1, "dep": "hr" }
           |""".stripMargin)
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(-1, "hr") :: Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") 
:: Nil)
+        Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, 
null, "hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString
@@ -199,16 +204,16 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(-1, "hr") :: Row(1, "hr") :: Nil)
+        Row(5, -1, "hr") :: Row(4, 1, "hr") :: Nil)
 
-      append("id INT, dep STRING",
-        """{ "id": null, "dep": "hr" }
-          |{ "id": 2, "dep": "hr" }
+      append("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 6, "id": null, "dep": "hr" }
+          |{ "pk": 7, "id": 2, "dep": "hr" }
           |""".stripMargin)
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(-1, "hr") :: Row(1, "hr") :: Row(2, "hr") :: Row(null, "hr") :: 
Nil)
+        Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(7, 2, "hr") :: Row(6, null, 
"hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString
@@ -220,16 +225,16 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(-1, "hr") :: Row(1, "hr") :: Row(null, "hr") :: Nil)
+        Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(6, null, "hr") :: Nil)
     }
   }
 
   test("delete with multi-column IN subqueries") {
     withTempView("deleted_employee") {
-      createAndInitTable("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hardware" }
+          |{ "pk": 3, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       val deletedEmployeeDF = Seq((None, "hr"), (Some(1), "hr")).toDF()
@@ -243,16 +248,16 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, "hardware") :: Row(null, "hr") :: Nil)
+        Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
     }
   }
 
   test("delete with NOT IN subqueries") {
     withTempView("deleted_id", "deleted_dep") {
-      createAndInitTable("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hardware" }
+          |{ "pk": 3, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       val deletedIdDF = Seq(Some(-1), Some(-2), None).toDF()
@@ -269,7 +274,7 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") :: Nil)
+        Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString
@@ -277,17 +282,17 @@ abstract class DeleteFromTableSuiteBase
            | id NOT IN (SELECT * FROM deleted_id WHERE value IS NOT NULL)
            |""".stripMargin)
 
-      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(null, "hr") :: 
Nil)
+      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(3, null, "hr") 
:: Nil)
 
-      append("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      append("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 4, "id": 1, "dep": "hr" }
+          |{ "pk": 5, "id": 2, "dep": "hardware" }
+          |{ "pk": 6, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") :: Row(null, 
"hr") :: Nil)
+        Row(4, 1, "hr") :: Row(5, 2, "hardware") :: Row(3, null, "hr") :: 
Row(6, null, "hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString
@@ -297,7 +302,7 @@ abstract class DeleteFromTableSuiteBase
            | dep IN ('software', 'hr')
            |""".stripMargin)
 
-      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, "hardware") 
:: Nil)
+      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(5, 2, 
"hardware") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString
@@ -307,7 +312,7 @@ abstract class DeleteFromTableSuiteBase
            | EXISTS (SELECT 1 FROM FROM deleted_dep WHERE dep = 
deleted_dep.value)
            |""".stripMargin)
 
-      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, "hardware") 
:: Nil)
+      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(5, 2, 
"hardware") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString t
@@ -323,10 +328,10 @@ abstract class DeleteFromTableSuiteBase
 
   test("delete with EXISTS subquery") {
     withTempView("deleted_id", "deleted_dep") {
-      createAndInitTable("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hardware" }
+          |{ "pk": 3, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       val deletedIdDF = Seq(Some(-1), Some(-2), None).toDF()
@@ -343,7 +348,7 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") :: Nil)
+        Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString t
@@ -353,7 +358,7 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, "hardware") :: Row(null, "hr") :: Nil)
+        Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString t
@@ -363,7 +368,7 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, "hardware") :: Nil)
+        Row(2, 2, "hardware") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString t
@@ -375,16 +380,16 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, "hardware") :: Nil)
+        Row(2, 2, "hardware") :: Nil)
     }
   }
 
   test("delete with NOT EXISTS subquery") {
     withTempView("deleted_id", "deleted_dep") {
-      createAndInitTable("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hardware" }
+          |{ "pk": 3, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       val deletedIdDF = Seq(Some(-1), Some(-2), None).toDF()
@@ -403,7 +408,7 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(1, "hr") :: Row(null, "hr") :: Nil)
+        Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString t
@@ -411,7 +416,7 @@ abstract class DeleteFromTableSuiteBase
            | NOT EXISTS (SELECT 1 FROM deleted_id d WHERE t.id = d.value + 2)
            |""".stripMargin)
 
-      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(1, "hr") :: 
Nil)
+      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: 
Nil)
 
       sql(
         s"""DELETE FROM $tableNameAsString t
@@ -427,10 +432,10 @@ abstract class DeleteFromTableSuiteBase
 
   test("delete with a scalar subquery") {
     withTempView("deleted_id") {
-      createAndInitTable("id INT, dep STRING",
-        """{ "id": 1, "dep": "hr" }
-          |{ "id": 2, "dep": "hardware" }
-          |{ "id": null, "dep": "hr" }
+      createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hardware" }
+          |{ "pk": 3, "id": null, "dep": "hr" }
           |""".stripMargin)
 
       val deletedIdDF = Seq(Some(1), Some(100), None).toDF()
@@ -444,18 +449,18 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, "hardware") :: Row(null, "hr") :: Nil)
+        Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
     }
   }
 
   test("delete refreshes relation cache") {
     withTempView("temp") {
       withCache("temp") {
-        createAndInitTable("id INT, dep STRING",
-          """{ "id": 1, "dep": "hr" }
-            |{ "id": 1, "dep": "hardware" }
-            |{ "id": 2, "dep": "hardware" }
-            |{ "id": 3, "dep": "hr" }
+        createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+          """{ "pk": 1, "id": 1, "dep": "hr" }
+            |{ "pk": 2, "id": 1, "dep": "hardware" }
+            |{ "pk": 3, "id": 2, "dep": "hardware" }
+            |{ "pk": 4, "id": 3, "dep": "hr" }
             |""".stripMargin)
 
         // define a view on top of the table
@@ -468,7 +473,7 @@ abstract class DeleteFromTableSuiteBase
         // verify the view returns expected results
         checkAnswer(
           sql("SELECT * FROM temp"),
-          Row(1, "hr") :: Row(1, "hardware") :: Nil)
+          Row(1, 1, "hr") :: Row(2, 1, "hardware") :: Nil)
 
         // delete some records from the table
         sql(s"DELETE FROM $tableNameAsString WHERE id <= 1")
@@ -476,7 +481,7 @@ abstract class DeleteFromTableSuiteBase
         // verify the delete was successful
         checkAnswer(
           sql(s"SELECT * FROM $tableNameAsString"),
-          Row(2, "hardware") :: Row(3, "hr") :: Nil)
+          Row(3, 2, "hardware") :: Row(4, 3, "hr") :: Nil)
 
         // verify the view reflects the changes in the table
         checkAnswer(sql("SELECT * FROM temp"), Nil)
@@ -485,10 +490,10 @@ abstract class DeleteFromTableSuiteBase
   }
 
   test("delete with nondeterministic conditions") {
-    createAndInitTable("id INT, dep STRING",
-      """{ "id": 1, "dep": "hr" }
-        |{ "id": 2, "dep": "software" }
-        |{ "id": 3, "dep": "hr" }
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
         |""".stripMargin)
 
     val e = intercept[AnalysisException] {
@@ -498,10 +503,10 @@ abstract class DeleteFromTableSuiteBase
   }
 
   test("delete without condition executed as delete with filters") {
-    createAndInitTable("id INT, dep INT",
-      """{ "id": 1, "dep": 100 }
-        |{ "id": 2, "dep": 200 }
-        |{ "id": 3, "dep": 100 }
+    createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+      """{ "pk": 1, "id": 1, "dep": 100 }
+        |{ "pk": 2, "id": 2, "dep": 200 }
+        |{ "pk": 3, "id": 3, "dep": 100 }
         |""".stripMargin)
 
     executeDeleteWithFilters(s"DELETE FROM $tableNameAsString")
@@ -510,39 +515,39 @@ abstract class DeleteFromTableSuiteBase
   }
 
   test("delete with supported predicates gets converted into delete with 
filters") {
-    createAndInitTable("id INT, dep INT",
-      """{ "id": 1, "dep": 100 }
-        |{ "id": 2, "dep": 200 }
-        |{ "id": 3, "dep": 100 }
+    createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+      """{ "pk": 1, "id": 1, "dep": 100 }
+        |{ "pk": 2, "id": 2, "dep": 200 }
+        |{ "pk": 3, "id": 3, "dep": 100 }
         |""".stripMargin)
 
     executeDeleteWithFilters(s"DELETE FROM $tableNameAsString WHERE dep = 100")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(2, 200) :: Nil)
+      Row(2, 2, 200) :: Nil)
   }
 
   test("delete with unsupported predicates cannot be converted into delete 
with filters") {
-    createAndInitTable("id INT, dep INT",
-      """{ "id": 1, "dep": 100 }
-        |{ "id": 2, "dep": 200 }
-        |{ "id": 3, "dep": 100 }
+    createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+      """{ "pk": 1, "id": 1, "dep": 100 }
+        |{ "pk": 2, "id": 2, "dep": 200 }
+        |{ "pk": 3, "id": 3, "dep": 100 }
         |""".stripMargin)
 
     executeDeleteWithRewrite(s"DELETE FROM $tableNameAsString WHERE dep = 100 
OR dep < 200")
 
     checkAnswer(
       sql(s"SELECT * FROM $tableNameAsString"),
-      Row(2, 200) :: Nil)
+      Row(2, 2, 200) :: Nil)
   }
 
   test("delete with subquery cannot be converted into delete with filters") {
     withTempView("deleted_id") {
-      createAndInitTable("id INT, dep INT",
-        """{ "id": 1, "dep": 100 }
-          |{ "id": 2, "dep": 200 }
-          |{ "id": 3, "dep": 100 }
+      createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+        """{ "pk": 1, "id": 1, "dep": 100 }
+          |{ "pk": 2, "id": 2, "dep": 200 }
+          |{ "pk": 3, "id": 3, "dep": 100 }
           |""".stripMargin)
 
       val deletedIdDF = Seq(Some(1), Some(100), None).toDF()
@@ -553,14 +558,13 @@ abstract class DeleteFromTableSuiteBase
 
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
-        Row(2, 200) :: Row(3, 100) :: Nil)
+        Row(2, 2, 200) :: Row(3, 3, 100) :: Nil)
     }
   }
 
   protected def createTable(schemaString: String): Unit = {
     val schema = StructType.fromDDL(schemaString)
-    val tableProps = Collections.emptyMap[String, String]
-    catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))), 
tableProps)
+    catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))), 
extraTableProps)
   }
 
   protected def createAndInitTable(schemaString: String, jsonData: String): 
Unit = {
@@ -569,8 +573,10 @@ abstract class DeleteFromTableSuiteBase
   }
 
   private def append(schemaString: String, jsonData: String): Unit = {
-    val df = toDF(jsonData, schemaString)
-    df.coalesce(1).writeTo(tableNameAsString).append()
+    
withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key 
-> "true") {
+      val df = toDF(jsonData, schemaString)
+      df.coalesce(1).writeTo(tableNameAsString).append()
+    }
   }
 
   private def toDF(jsonData: String, schemaString: String = null): DataFrame = 
{
@@ -604,6 +610,8 @@ abstract class DeleteFromTableSuiteBase
     executedPlan match {
       case _: ReplaceDataExec =>
         // OK
+      case _: WriteDeltaExec =>
+        // OK
       case other =>
         fail("unexpected executed plan: " + other)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
new file mode 100644
index 00000000000..fd7a04ea926
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.AnalysisException
+
+class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {
+
+  override protected lazy val extraTableProps: java.util.Map[String, String] = 
{
+    val props = new java.util.HashMap[String, String]()
+    props.put("supports-deltas", "true")
+    props
+  }
+
+  test("nullable row ID attrs") {
+    createAndInitTable("pk INT, salary INT, dep STRING",
+      """{ "pk": 1, "salary": 300, "dep": 'hr' }
+        |{ "pk": 2, "salary": 150, "dep": 'software' }
+        |{ "pk": 3, "salary": 120, "dep": 'hr' }
+        |""".stripMargin)
+
+    val exception = intercept[AnalysisException] {
+      sql(s"DELETE FROM $tableNameAsString WHERE pk = 1")
+    }
+    assert(exception.message.contains("Row ID attributes cannot be nullable"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to