This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 570b32500844 [SPARK-52689][SQL] Send DML Metrics to V2Write
570b32500844 is described below

commit 570b32500844893b4ed23a934b584248918e7480
Author: Szehon Ho <szehon.apa...@gmail.com>
AuthorDate: Mon Jul 28 17:29:35 2025 +0800

    [SPARK-52689][SQL] Send DML Metrics to V2Write
    
    ### What changes were proposed in this pull request?
    Send some DML execution metrics (ie, MergeRowsExec) to the write of these 
data source, so they can persist them for debugging purpose.
    
    ### Why are the changes needed?
    DML row-level-operations, ie MERGE, UPDATE, DELETE are a critical 
functionality of V2 data sources (like Iceberg).  It will be nice, if we can 
send some DML metrics to the commit of these data source, so they can persist 
them for debugging purpose on commit metadata.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51377 from szehon-ho/metric_to_write.
    
    Lead-authored-by: Szehon Ho <szehon.apa...@gmail.com>
    Co-authored-by: Szehon Ho <sze...@apache.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/connector/write/BatchWrite.java      |  45 ++++++++
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  15 +++
 .../catalog/InMemoryRowLevelOperationTable.scala   |  23 ++++-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  15 ++-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 115 ++++++++++++++++++++-
 5 files changed, 205 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
index 8c068928415f..c8febd0fe493 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
@@ -19,6 +19,8 @@ package org.apache.spark.sql.connector.write;
 
 import org.apache.spark.annotation.Evolving;
 
+import java.util.Map;
+
 /**
  * An interface that defines how to write the data to data source for batch 
processing.
  * <p>
@@ -88,6 +90,49 @@ public interface BatchWrite {
    */
   void commit(WriterCommitMessage[] messages);
 
+  /**
+   * Commits this writing job with a list of commit messages and operation 
metrics.
+   * <p>
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
+   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
+   * <p>
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow at most one task to commit. 
Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. If 
disabled, multiple
+   * tasks may have committed successfully and one successful commit message 
per task will be
+   * passed to this commit method. The remaining commit messages are ignored 
by Spark.
+   * <p>
+   * @param messages a list of commit messages from successful data writers, 
produced by
+   *                 {@link DataWriter#commit()}.
+   * @param metrics a map of operation metrics collected from the query 
producing write.
+   *                The keys will be prefixed by operation type, eg `merge`.
+   *                <p>
+   *                Currently supported metrics are:
+   *                <ul>
+   *                  <li>Operation Type = `merge`
+   *                    <ul>
+   *                      <li>`numTargetRowsCopied`: number of target rows 
copied unmodified because
+   *                      they did not match any action</li>
+   *                      <li>`numTargetRowsDeleted`: number of target rows 
deleted</li>
+   *                      <li>`numTargetRowsUpdated`: number of target rows 
updated</li>
+   *                      <li>`numTargetRowsInserted`: number of target rows 
inserted</li>
+   *                      <li>`numTargetRowsMatchedUpdated`: number of target 
rows updated by a
+   *                      matched clause</li>
+   *                      <li>`numTargetRowsMatchedDeleted`: number of target 
rows deleted by a
+   *                      matched clause</li>
+   *                      <li>`numTargetRowsNotMatchedBySourceUpdated`: number 
of target rows
+   *                      updated by a not matched by source clause</li>
+   *                      <li>`numTargetRowsNotMatchedBySourceDeleted`: number 
of target rows
+   *                      deleted by a not matched by source clause</li>
+   *                    </ul>
+   *                  </li>
+   *                </ul>
+   */
+  default void commit(WriterCommitMessage[] messages, Map<String, Long> 
metrics) {
+    commit(messages);
+  }
+
   /**
    * Aborts this writing job because some data writers are failed and keep 
failing when retry,
    * or the Spark job fails with some unknown reasons,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 4efb08a67829..a3b10054a359 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -23,6 +23,7 @@ import java.util
 import java.util.OptionalLong
 
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
 import com.google.common.base.Objects
@@ -152,6 +153,8 @@ abstract class InMemoryBaseTable(
   // The key `Seq[Any]` is the partition values, value is a set of splits, 
each with a set of rows.
   val dataMap: mutable.Map[Seq[Any], Seq[BufferedRows]] = mutable.Map.empty
 
+  val commits: ListBuffer[Commit] = ListBuffer[Commit]()
+
   def data: Array[BufferedRows] = dataMap.values.flatten.toArray
 
   def rows: Seq[InternalRow] = dataMap.values.flatten.flatMap(_.rows).toSeq
@@ -616,6 +619,9 @@ abstract class InMemoryBaseTable(
   }
 
   protected abstract class TestBatchWrite extends BatchWrite {
+
+    var commitProperties: mutable.Map[String, String] = 
mutable.Map.empty[String, String]
+
     override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
       BufferedRowsWriterFactory
     }
@@ -624,8 +630,11 @@ abstract class InMemoryBaseTable(
   }
 
   class Append(val info: LogicalWriteInfo) extends TestBatchWrite {
+
     override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
       withData(messages.map(_.asInstanceOf[BufferedRows]))
+      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
+      commitProperties.clear()
     }
   }
 
@@ -634,6 +643,8 @@ abstract class InMemoryBaseTable(
       val newData = messages.map(_.asInstanceOf[BufferedRows])
       dataMap --= newData.flatMap(_.rows.map(getKey))
       withData(newData)
+      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
+      commitProperties.clear()
     }
   }
 
@@ -641,6 +652,8 @@ abstract class InMemoryBaseTable(
     override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
       dataMap.clear()
       withData(messages.map(_.asInstanceOf[BufferedRows]))
+      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
+      commitProperties.clear()
     }
   }
 
@@ -882,6 +895,8 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends 
CustomTaskMetric {
   override def value(): Long = value
 }
 
+case class Commit(id: Long, properties: Map[String, String])
+
 sealed trait Operation
 case object Write extends Operation
 case object Delete extends Operation
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
index aeb807768b07..100570a67a07 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.connector.catalog
 
-import java.util
+import java.{lang, util}
+import java.time.Instant
+
+import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
@@ -111,7 +114,21 @@ class InMemoryRowLevelOperationTable(
     override def description(): String = "InMemoryPartitionReplaceOperation"
   }
 
-  private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) 
extends TestBatchWrite {
+  abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
+
+    override def commit(messages: Array[WriterCommitMessage],
+                                            metrics: util.Map[String, 
lang.Long]): Unit = {
+      metrics.asScala.map {
+        case (key, value) => commitProperties += key -> String.valueOf(value)
+      }
+      commit(messages)
+      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
+      commitProperties.clear()
+    }
+  }
+
+  private case class PartitionBasedReplaceData(scan: InMemoryBatchScan)
+    extends RowLevelOperationBatchWrite {
 
     override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
       val newData = messages.map(_.asInstanceOf[BufferedRows])
@@ -165,7 +182,7 @@ class InMemoryRowLevelOperationTable(
     }
   }
 
-  private object TestDeltaBatchWrite extends DeltaBatchWrite {
+  private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite with 
DeltaBatchWrite{
     override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DeltaWriterFactory = {
       DeltaBufferedRowsWriterFactory
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 193662718ce4..5801f0580de2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.lang
+import java.util
+
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -34,6 +37,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, 
WriterCommitMessage}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, 
SQLMetrics}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{LongAccumulator, Utils}
@@ -398,7 +402,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
 /**
  * The base physical plan for writing data into data source v2.
  */
-trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
+trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with 
AdaptiveSparkPlanHelper {
   def query: SparkPlan
   def writingTask: WritingSparkTask[_] = DataWritingSparkTask
 
@@ -451,8 +455,9 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
         }
       )
 
+      val operationMetrics = getOperationMetrics(query)
       logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} is committing.")
-      batchWrite.commit(messages)
+      batchWrite.commit(messages, operationMetrics)
       logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} committed.")
       commitProgress = 
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
@@ -474,6 +479,12 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
 
     Nil
   }
+
+  private def getOperationMetrics(query: SparkPlan): util.Map[String, 
lang.Long] = {
+    collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
+      n.metrics.map { case (name, metric) => s"merge.$name" -> 
lang.Long.valueOf(metric.value) }
+    }.getOrElse(Map.empty[String, lang.Long]).asJava
+  }
 }
 
 trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with 
Serializable {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 21b171bee961..e5786619f98f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkRuntimeException
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
In, Not}
 import org.apache.spark.sql.catalyst.optimizer.BuildLeft
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
TableInfo}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
InMemoryTable, TableInfo}
 import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, 
LiteralValue}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -1811,6 +1811,17 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(1, 1000, "hr"), // updated
           Row(2, 200, "software"),
           Row(3, 300, "hr")))
+
+      val table = catalog.loadTable(ident)
+      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "2"))
+      assert(commitProps("merge.numTargetRowsInserted") === "0")
+      assert(commitProps("merge.numTargetRowsUpdated") === "1")
+      assert(commitProps("merge.numTargetRowsDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
+      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
     }
   }
 
@@ -1856,6 +1867,17 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(2, 200, "software"),
           Row(3, 300, "hr"),
           Row(5, 400, "executive"))) // inserted
+
+      val table = catalog.loadTable(ident)
+      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+      assert(commitProps("merge.numTargetRowsCopied") === "0")
+      assert(commitProps("merge.numTargetRowsInserted") === "1")
+      assert(commitProps("merge.numTargetRowsUpdated") === "0")
+      assert(commitProps("merge.numTargetRowsDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
     }
   }
 
@@ -1883,7 +1905,6 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
            |""".stripMargin
       }
 
-
       assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3)
       assertMetric(mergeExec, "numTargetRowsInserted", 0)
       assertMetric(mergeExec, "numTargetRowsUpdated", 2)
@@ -1901,6 +1922,17 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(3, 300, "hr"),
           Row(4, 400, "marketing"),
           Row(5, -1, "executive"))) // updated
+
+      val table = catalog.loadTable(ident)
+      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
+      assert(commitProps("merge.numTargetRowsInserted") === "0")
+      assert(commitProps("merge.numTargetRowsUpdated") === "2")
+      assert(commitProps("merge.numTargetRowsDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
+      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"1")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
     }
   }
 
@@ -1947,6 +1979,17 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(4, 400, "marketing"))
           // Row(5, 500, "executive") deleted
       )
+
+      val table = catalog.loadTable(ident)
+      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
+      assert(commitProps("merge.numTargetRowsInserted") === "0")
+      assert(commitProps("merge.numTargetRowsUpdated") === "0")
+      assert(commitProps("merge.numTargetRowsDeleted") === "2")
+      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"1")
     }
   }
 
@@ -1994,6 +2037,17 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(4, 400, "marketing"),
           Row(5, -1, "executive"), // updated
           Row(6, -1, "dummy"))) // inserted
+
+      val table = catalog.loadTable(ident)
+      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
+      assert(commitProps("merge.numTargetRowsInserted") === "1")
+      assert(commitProps("merge.numTargetRowsUpdated") === "2")
+      assert(commitProps("merge.numTargetRowsDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
+      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"1")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
     }
   }
 
@@ -2032,7 +2086,6 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
       assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0)
       assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1)
 
-
       checkAnswer(
         sql(s"SELECT * FROM $tableNameAsString"),
         Seq(
@@ -2042,6 +2095,62 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(4, 400, "marketing"),
           // Row(5, 500, "executive") deleted
           Row(6, -1, "dummy"))) // inserted
+
+      val table = catalog.loadTable(ident)
+      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
+      assert(commitProps("merge.numTargetRowsInserted") === "1")
+      assert(commitProps("merge.numTargetRowsUpdated") === "0")
+      assert(commitProps("merge.numTargetRowsDeleted") === "2")
+      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
+      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
+      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"1")
+    }
+  }
+
+  test("SPARK-52689: V2 write metrics for merge") {
+    Seq("true", "false").foreach { aqeEnabled: String =>
+      withTempView("source") {
+        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
+          createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+            """{ "pk": 1, "salary": 100, "dep": "hr" }
+              |{ "pk": 2, "salary": 200, "dep": "software" }
+              |{ "pk": 3, "salary": 300, "dep": "hr" }
+              |{ "pk": 4, "salary": 400, "dep": "marketing" }
+              |{ "pk": 5, "salary": 500, "dep": "executive" }
+              |""".stripMargin)
+
+          val sourceDF = Seq(1, 2, 6, 10).toDF("pk")
+          sourceDF.createOrReplaceTempView("source")
+
+          sql(
+            s"""MERGE INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED AND salary < 200 THEN
+               | DELETE
+               |WHEN NOT MATCHED AND s.pk < 10 THEN
+               | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy")
+               |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN
+               | DELETE
+               |""".stripMargin
+          )
+
+          val table = catalog.loadTable(ident)
+          val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
+          assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) 
"0" else "3"))
+          assert(commitProps("merge.numTargetRowsInserted") === "1")
+          assert(commitProps("merge.numTargetRowsUpdated") === "0")
+          assert(commitProps("merge.numTargetRowsDeleted") === "2")
+          assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
+          assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
+          assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") 
=== "0")
+          assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") 
=== "1")
+
+          sql(s"DROP TABLE $tableNameAsString")
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to