This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 56661c2ad195 [SPARK-53891][SQL] Model DSV2 Commit Write Summary API
56661c2ad195 is described below

commit 56661c2ad195da9ca44c9d0344e9741bb3101096
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Oct 23 16:43:40 2025 -0700

    [SPARK-53891][SQL] Model DSV2 Commit Write Summary API
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/51377 added a DataSourceV2 API that 
sends operation metrics along with the commit, via a map of string, long.  
Change this to a proper model.
    
    Suggestion from aokolnychyi
    
    ### Why are the changes needed?
    It would be cleaner to model it as a proper object so that it is more clear 
what information Spark sends, and to handle future cases where metrics may not 
be long values.
    
    ### Does this PR introduce _any_ user-facing change?
    No, unreleased DSV2 API.
    
    ### How was this patch tested?
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52595 from szehon-ho/SPARK-53891.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/connector/write/BatchWrite.java      |  32 +----
 .../spark/sql/connector/write/MergeSummary.java    |  69 ++++++++++
 .../spark/sql/connector/write/WriteSummary.java    |  29 +++++
 .../sql/connector/write/MergeSummaryImpl.scala     |  33 +++++
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  13 +-
 .../catalog/InMemoryRowLevelOperationTable.scala   |  15 +--
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  30 +++--
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 140 ++++++++++-----------
 8 files changed, 234 insertions(+), 127 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
index c8febd0fe493..44fc5f9d794b 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
@@ -19,8 +19,6 @@ package org.apache.spark.sql.connector.write;
 
 import org.apache.spark.annotation.Evolving;
 
-import java.util.Map;
-
 /**
  * An interface that defines how to write the data to data source for batch 
processing.
  * <p>
@@ -91,7 +89,7 @@ public interface BatchWrite {
   void commit(WriterCommitMessage[] messages);
 
   /**
-   * Commits this writing job with a list of commit messages and operation 
metrics.
+   * Commits this writing job with a list of commit messages and write summary.
    * <p>
    * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
@@ -105,31 +103,11 @@ public interface BatchWrite {
    * <p>
    * @param messages a list of commit messages from successful data writers, 
produced by
    *                 {@link DataWriter#commit()}.
-   * @param metrics a map of operation metrics collected from the query 
producing write.
-   *                The keys will be prefixed by operation type, eg `merge`.
-   *                <p>
-   *                Currently supported metrics are:
-   *                <ul>
-   *                  <li>Operation Type = `merge`
-   *                    <ul>
-   *                      <li>`numTargetRowsCopied`: number of target rows 
copied unmodified because
-   *                      they did not match any action</li>
-   *                      <li>`numTargetRowsDeleted`: number of target rows 
deleted</li>
-   *                      <li>`numTargetRowsUpdated`: number of target rows 
updated</li>
-   *                      <li>`numTargetRowsInserted`: number of target rows 
inserted</li>
-   *                      <li>`numTargetRowsMatchedUpdated`: number of target 
rows updated by a
-   *                      matched clause</li>
-   *                      <li>`numTargetRowsMatchedDeleted`: number of target 
rows deleted by a
-   *                      matched clause</li>
-   *                      <li>`numTargetRowsNotMatchedBySourceUpdated`: number 
of target rows
-   *                      updated by a not matched by source clause</li>
-   *                      <li>`numTargetRowsNotMatchedBySourceDeleted`: number 
of target rows
-   *                      deleted by a not matched by source clause</li>
-   *                    </ul>
-   *                  </li>
-   *                </ul>
+   * @param summary an informational summary collected in a best-effort from 
the operation
+   *                producing write. Currently supported summary fields are 
provided through
+   *                implementations of {@link WriteSummary}.
    */
-  default void commit(WriterCommitMessage[] messages, Map<String, Long> 
metrics) {
+  default void commit(WriterCommitMessage[] messages, WriteSummary summary) {
     commit(messages);
   }
 
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
new file mode 100644
index 000000000000..a759e6693edb
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Provides an informational summary of the MERGE operation producing write.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface MergeSummary extends WriteSummary {
+
+  /**
+   * Returns the number of target rows copied unmodified because they did not 
match any action.
+   */
+  long numTargetRowsCopied();
+
+  /**
+   * Returns the number of target rows deleted.
+   */
+  long numTargetRowsDeleted();
+
+  /**
+   * Returns the number of target rows updated.
+   */
+  long numTargetRowsUpdated();
+
+  /**
+   * Returns the number of target rows inserted.
+   */
+  long numTargetRowsInserted();
+
+  /**
+   * Returns the number of target rows updated by a matched clause.
+   */
+  long numTargetRowsMatchedUpdated();
+
+  /**
+   * Returns the number of target rows deleted by a matched clause
+   */
+  long numTargetRowsMatchedDeleted();
+
+  /**
+   * Returns the number of target rows updated by a not matched by source 
clause.
+   */
+  long numTargetRowsNotMatchedBySourceUpdated();
+
+  /**
+   * Returns the number of target rows deleted by a not matched by source 
clause.
+   */
+  long numTargetRowsNotMatchedBySourceDeleted();
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java
new file mode 100644
index 000000000000..a8ae462fd3c0
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * An informational summary of the operation producing write.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface WriteSummary {
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
new file mode 100644
index 000000000000..911749072c43
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write
+
+/**
+ * Implementation of [[MergeSummary]] that provides MERGE operation summary.
+ */
+private[sql] case class MergeSummaryImpl(
+    numTargetRowsCopied: Long,
+    numTargetRowsDeleted: Long,
+    numTargetRowsUpdated: Long,
+    numTargetRowsInserted: Long,
+    numTargetRowsMatchedUpdated: Long,
+    numTargetRowsMatchedDeleted: Long,
+    numTargetRowsNotMatchedBySourceUpdated: Long,
+    numTargetRowsNotMatchedBySourceDeleted: Long)
+  extends MergeSummary {
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 14941de1cc2c..5faf71551586 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -655,8 +655,6 @@ abstract class InMemoryBaseTable(
 
   protected abstract class TestBatchWrite extends BatchWrite {
 
-    var commitProperties: mutable.Map[String, String] = 
mutable.Map.empty[String, String]
-
     override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
       new 
BufferedRowsWriterFactory(CatalogV2Util.v2ColumnsToStructType(columns()))
     }
@@ -668,8 +666,7 @@ abstract class InMemoryBaseTable(
 
     override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
       withData(messages.map(_.asInstanceOf[BufferedRows]))
-      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
-      commitProperties.clear()
+      commits += Commit(Instant.now().toEpochMilli)
     }
   }
 
@@ -678,8 +675,7 @@ abstract class InMemoryBaseTable(
       val newData = messages.map(_.asInstanceOf[BufferedRows])
       dataMap --= newData.flatMap(_.rows.map(getKey))
       withData(newData)
-      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
-      commitProperties.clear()
+      commits += Commit(Instant.now().toEpochMilli)
     }
   }
 
@@ -687,8 +683,7 @@ abstract class InMemoryBaseTable(
     override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
       dataMap.clear()
       withData(messages.map(_.asInstanceOf[BufferedRows]))
-      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
-      commitProperties.clear()
+      commits += Commit(Instant.now().toEpochMilli)
     }
   }
 
@@ -1045,7 +1040,7 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends 
CustomTaskMetric {
   override def value(): Long = value
 }
 
-case class Commit(id: Long, properties: Map[String, String])
+case class Commit(id: Long, writeSummary: Option[WriteSummary] = None)
 
 sealed trait Operation
 case object Write extends Operation
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
index 9c1b1366837f..2f7cad599215 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.connector.catalog
 
-import java.{lang, util}
 import java.time.Instant
-
-import scala.jdk.CollectionConverters._
+import java.util
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
@@ -28,7 +26,7 @@ import 
org.apache.spark.sql.connector.catalog.constraints.Constraint
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, 
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, 
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, 
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, 
SupportsDelta, Write, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, 
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, 
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, 
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, 
SupportsDelta, Write, WriteBuilder, WriterCommitMessage, WriteSummary}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -116,14 +114,9 @@ class InMemoryRowLevelOperationTable(
 
   abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
 
-    override def commit(messages: Array[WriterCommitMessage],
-                                            metrics: util.Map[String, 
lang.Long]): Unit = {
-      metrics.asScala.map {
-        case (key, value) => commitProperties += key -> String.valueOf(value)
-      }
+    override def commit(messages: Array[WriterCommitMessage], metrics: 
WriteSummary): Unit = {
       commit(messages)
-      commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
-      commitProperties.clear()
+      commits += Commit(Instant.now().toEpochMilli, Some(metrics))
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 4904e3d60dc9..2a3a3441accc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.lang
-import java.util
-
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -34,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, 
TableWritePrivilege}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, 
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, 
PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -455,9 +452,12 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode with AdaptiveSpa
         }
       )
 
-      val operationMetrics = getOperationMetrics(query)
+      val writeSummary = getWriteSummary(query)
       logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} is committing.")
-      batchWrite.commit(messages, operationMetrics)
+      writeSummary match {
+        case Some(summary) => batchWrite.commit(messages, summary)
+        case None => batchWrite.commit(messages)
+      }
       logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} committed.")
       commitProgress = 
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
@@ -480,10 +480,20 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode with AdaptiveSpa
     Nil
   }
 
-  private def getOperationMetrics(query: SparkPlan): util.Map[String, 
lang.Long] = {
-    collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
-      n.metrics.map { case (name, metric) => s"merge.$name" -> 
lang.Long.valueOf(metric.value) }
-    }.getOrElse(Map.empty[String, lang.Long]).asJava
+  private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
+    collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
+      val metrics = n.metrics
+      MergeSummaryImpl(
+        metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
+        metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
+        metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
+        metrics.get("numTargetRowsInserted").map(_.value).getOrElse(-1L),
+        metrics.get("numTargetRowsMatchedUpdated").map(_.value).getOrElse(-1L),
+        metrics.get("numTargetRowsMatchedDeleted").map(_.value).getOrElse(-1L),
+        
metrics.get("numTargetRowsNotMatchedBySourceUpdated").map(_.value).getOrElse(-1L),
+        
metrics.get("numTargetRowsNotMatchedBySourceDeleted").map(_.value).getOrElse(-1L)
+      )
+    }
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 7d879e2c9a5e..2e175951851a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -23,6 +23,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, I
 import org.apache.spark.sql.catalyst.optimizer.BuildLeft
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
ColumnDefaultValue, InMemoryTable, TableInfo}
 import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, 
LiteralValue}
+import org.apache.spark.sql.connector.write.MergeSummary
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
@@ -1812,16 +1813,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(2, 200, "software"),
           Row(3, 300, "hr")))
 
-      val table = catalog.loadTable(ident)
-      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "2"))
-      assert(commitProps("merge.numTargetRowsInserted") === "0")
-      assert(commitProps("merge.numTargetRowsUpdated") === "1")
-      assert(commitProps("merge.numTargetRowsDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
-      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L))
+      assert(mergeSummary.numTargetRowsInserted === 0L)
+      assert(mergeSummary.numTargetRowsUpdated === 1L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
@@ -1868,16 +1868,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(3, 300, "hr"),
           Row(5, 400, "executive"))) // inserted
 
-      val table = catalog.loadTable(ident)
-      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-      assert(commitProps("merge.numTargetRowsCopied") === "0")
-      assert(commitProps("merge.numTargetRowsInserted") === "1")
-      assert(commitProps("merge.numTargetRowsUpdated") === "0")
-      assert(commitProps("merge.numTargetRowsDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === 0L)
+      assert(mergeSummary.numTargetRowsInserted === 1L)
+      assert(mergeSummary.numTargetRowsUpdated === 0L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
@@ -1923,16 +1922,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(4, 400, "marketing"),
           Row(5, -1, "executive"))) // updated
 
-      val table = catalog.loadTable(ident)
-      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
-      assert(commitProps("merge.numTargetRowsInserted") === "0")
-      assert(commitProps("merge.numTargetRowsUpdated") === "2")
-      assert(commitProps("merge.numTargetRowsDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
-      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"1")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 0L)
+      assert(mergeSummary.numTargetRowsUpdated === 2L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
@@ -1980,16 +1978,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           // Row(5, 500, "executive") deleted
       )
 
-      val table = catalog.loadTable(ident)
-      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
-      assert(commitProps("merge.numTargetRowsInserted") === "0")
-      assert(commitProps("merge.numTargetRowsUpdated") === "0")
-      assert(commitProps("merge.numTargetRowsDeleted") === "2")
-      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"1")
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 0L)
+      assert(mergeSummary.numTargetRowsUpdated === 0L)
+      assert(mergeSummary.numTargetRowsDeleted === 2L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
     }
   }
 
@@ -2038,16 +2035,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           Row(5, -1, "executive"), // updated
           Row(6, -1, "dummy"))) // inserted
 
-      val table = catalog.loadTable(ident)
-      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
-      assert(commitProps("merge.numTargetRowsInserted") === "1")
-      assert(commitProps("merge.numTargetRowsUpdated") === "2")
-      assert(commitProps("merge.numTargetRowsDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
-      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"1")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"0")
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 1L)
+      assert(mergeSummary.numTargetRowsUpdated === 2L)
+      assert(mergeSummary.numTargetRowsDeleted === 0L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
     }
   }
 
@@ -2096,16 +2092,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
           // Row(5, 500, "executive") deleted
           Row(6, -1, "dummy"))) // inserted
 
-      val table = catalog.loadTable(ident)
-      val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-      assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" 
else "3"))
-      assert(commitProps("merge.numTargetRowsInserted") === "1")
-      assert(commitProps("merge.numTargetRowsUpdated") === "0")
-      assert(commitProps("merge.numTargetRowsDeleted") === "2")
-      assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
-      assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === 
"0")
-      assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === 
"1")
+      val mergeSummary = getMergeSummary()
+      assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+      assert(mergeSummary.numTargetRowsInserted === 1L)
+      assert(mergeSummary.numTargetRowsUpdated === 0L)
+      assert(mergeSummary.numTargetRowsDeleted === 2L)
+      assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+      assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+      assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
     }
   }
 
@@ -2137,16 +2132,15 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
                |""".stripMargin
           )
 
-          val table = catalog.loadTable(ident)
-          val commitProps = 
table.asInstanceOf[InMemoryTable].commits.last.properties
-          assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) 
"0" else "3"))
-          assert(commitProps("merge.numTargetRowsInserted") === "1")
-          assert(commitProps("merge.numTargetRowsUpdated") === "0")
-          assert(commitProps("merge.numTargetRowsDeleted") === "2")
-          assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
-          assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
-          assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") 
=== "0")
-          assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") 
=== "1")
+          val mergeMetrics = getMergeSummary()
+          assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else 
3L))
+          assert(mergeMetrics.numTargetRowsInserted === 1L)
+          assert(mergeMetrics.numTargetRowsUpdated === 0L)
+          assert(mergeMetrics.numTargetRowsDeleted === 2L)
+          assert(mergeMetrics.numTargetRowsMatchedUpdated === 0L)
+          assert(mergeMetrics.numTargetRowsMatchedDeleted === 1L)
+          assert(mergeMetrics.numTargetRowsNotMatchedBySourceUpdated === 0L)
+          assert(mergeMetrics.numTargetRowsNotMatchedBySourceDeleted === 1L)
 
           sql(s"DROP TABLE $tableNameAsString")
         }
@@ -3274,6 +3268,12 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     }
   }
 
+  private def getMergeSummary(): MergeSummary = {
+    val table = catalog.loadTable(ident)
+    table.asInstanceOf[InMemoryTable].commits.last.writeSummary.get
+      .asInstanceOf[MergeSummary]
+  }
+
   private def assertNoLeftBroadcastOrReplication(query: String): Unit = {
     val plan = executeAndKeepPlan {
       sql(query)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to