This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 56661c2ad195 [SPARK-53891][SQL] Model DSV2 Commit Write Summary API
56661c2ad195 is described below
commit 56661c2ad195da9ca44c9d0344e9741bb3101096
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Oct 23 16:43:40 2025 -0700
[SPARK-53891][SQL] Model DSV2 Commit Write Summary API
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/51377 added a DataSourceV2 API that
sends operation metrics along with the commit, via a map of string, long.
Change this to a proper model.
Suggestion from aokolnychyi
### Why are the changes needed?
It would be cleaner to model it as a proper object so that it is more clear
what information Spark sends, and to handle future cases where metrics may not
be long values.
### Does this PR introduce _any_ user-facing change?
No, unreleased DSV2 API.
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52595 from szehon-ho/SPARK-53891.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/connector/write/BatchWrite.java | 32 +----
.../spark/sql/connector/write/MergeSummary.java | 69 ++++++++++
.../spark/sql/connector/write/WriteSummary.java | 29 +++++
.../sql/connector/write/MergeSummaryImpl.scala | 33 +++++
.../sql/connector/catalog/InMemoryBaseTable.scala | 13 +-
.../catalog/InMemoryRowLevelOperationTable.scala | 15 +--
.../datasources/v2/WriteToDataSourceV2Exec.scala | 30 +++--
.../sql/connector/MergeIntoTableSuiteBase.scala | 140 ++++++++++-----------
8 files changed, 234 insertions(+), 127 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
index c8febd0fe493..44fc5f9d794b 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
@@ -19,8 +19,6 @@ package org.apache.spark.sql.connector.write;
import org.apache.spark.annotation.Evolving;
-import java.util.Map;
-
/**
* An interface that defines how to write the data to data source for batch
processing.
* <p>
@@ -91,7 +89,7 @@ public interface BatchWrite {
void commit(WriterCommitMessage[] messages);
/**
- * Commits this writing job with a list of commit messages and operation
metrics.
+ * Commits this writing job with a list of commit messages and write summary.
* <p>
* If this method fails (by throwing an exception), this writing job is
considered to to have been
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The
state of the destination
@@ -105,31 +103,11 @@ public interface BatchWrite {
* <p>
* @param messages a list of commit messages from successful data writers,
produced by
* {@link DataWriter#commit()}.
- * @param metrics a map of operation metrics collected from the query
producing write.
- * The keys will be prefixed by operation type, eg `merge`.
- * <p>
- * Currently supported metrics are:
- * <ul>
- * <li>Operation Type = `merge`
- * <ul>
- * <li>`numTargetRowsCopied`: number of target rows
copied unmodified because
- * they did not match any action</li>
- * <li>`numTargetRowsDeleted`: number of target rows
deleted</li>
- * <li>`numTargetRowsUpdated`: number of target rows
updated</li>
- * <li>`numTargetRowsInserted`: number of target rows
inserted</li>
- * <li>`numTargetRowsMatchedUpdated`: number of target
rows updated by a
- * matched clause</li>
- * <li>`numTargetRowsMatchedDeleted`: number of target
rows deleted by a
- * matched clause</li>
- * <li>`numTargetRowsNotMatchedBySourceUpdated`: number
of target rows
- * updated by a not matched by source clause</li>
- * <li>`numTargetRowsNotMatchedBySourceDeleted`: number
of target rows
- * deleted by a not matched by source clause</li>
- * </ul>
- * </li>
- * </ul>
+ * @param summary an informational summary collected in a best-effort from
the operation
+ * producing write. Currently supported summary fields are
provided through
+ * implementations of {@link WriteSummary}.
*/
- default void commit(WriterCommitMessage[] messages, Map<String, Long>
metrics) {
+ default void commit(WriterCommitMessage[] messages, WriteSummary summary) {
commit(messages);
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
new file mode 100644
index 000000000000..a759e6693edb
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Provides an informational summary of the MERGE operation producing write.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface MergeSummary extends WriteSummary {
+
+ /**
+ * Returns the number of target rows copied unmodified because they did not
match any action.
+ */
+ long numTargetRowsCopied();
+
+ /**
+ * Returns the number of target rows deleted.
+ */
+ long numTargetRowsDeleted();
+
+ /**
+ * Returns the number of target rows updated.
+ */
+ long numTargetRowsUpdated();
+
+ /**
+ * Returns the number of target rows inserted.
+ */
+ long numTargetRowsInserted();
+
+ /**
+ * Returns the number of target rows updated by a matched clause.
+ */
+ long numTargetRowsMatchedUpdated();
+
+ /**
+ * Returns the number of target rows deleted by a matched clause
+ */
+ long numTargetRowsMatchedDeleted();
+
+ /**
+ * Returns the number of target rows updated by a not matched by source
clause.
+ */
+ long numTargetRowsNotMatchedBySourceUpdated();
+
+ /**
+ * Returns the number of target rows deleted by a not matched by source
clause.
+ */
+ long numTargetRowsNotMatchedBySourceDeleted();
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java
new file mode 100644
index 000000000000..a8ae462fd3c0
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * An informational summary of the operation producing write.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface WriteSummary {
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
new file mode 100644
index 000000000000..911749072c43
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write
+
+/**
+ * Implementation of [[MergeSummary]] that provides MERGE operation summary.
+ */
+private[sql] case class MergeSummaryImpl(
+ numTargetRowsCopied: Long,
+ numTargetRowsDeleted: Long,
+ numTargetRowsUpdated: Long,
+ numTargetRowsInserted: Long,
+ numTargetRowsMatchedUpdated: Long,
+ numTargetRowsMatchedDeleted: Long,
+ numTargetRowsNotMatchedBySourceUpdated: Long,
+ numTargetRowsNotMatchedBySourceDeleted: Long)
+ extends MergeSummary {
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 14941de1cc2c..5faf71551586 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -655,8 +655,6 @@ abstract class InMemoryBaseTable(
protected abstract class TestBatchWrite extends BatchWrite {
- var commitProperties: mutable.Map[String, String] =
mutable.Map.empty[String, String]
-
override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
new
BufferedRowsWriterFactory(CatalogV2Util.v2ColumnsToStructType(columns()))
}
@@ -668,8 +666,7 @@ abstract class InMemoryBaseTable(
override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
withData(messages.map(_.asInstanceOf[BufferedRows]))
- commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
- commitProperties.clear()
+ commits += Commit(Instant.now().toEpochMilli)
}
}
@@ -678,8 +675,7 @@ abstract class InMemoryBaseTable(
val newData = messages.map(_.asInstanceOf[BufferedRows])
dataMap --= newData.flatMap(_.rows.map(getKey))
withData(newData)
- commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
- commitProperties.clear()
+ commits += Commit(Instant.now().toEpochMilli)
}
}
@@ -687,8 +683,7 @@ abstract class InMemoryBaseTable(
override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
dataMap.clear()
withData(messages.map(_.asInstanceOf[BufferedRows]))
- commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
- commitProperties.clear()
+ commits += Commit(Instant.now().toEpochMilli)
}
}
@@ -1045,7 +1040,7 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends
CustomTaskMetric {
override def value(): Long = value
}
-case class Commit(id: Long, properties: Map[String, String])
+case class Commit(id: Long, writeSummary: Option[WriteSummary] = None)
sealed trait Operation
case object Write extends Operation
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
index 9c1b1366837f..2f7cad599215 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.connector.catalog
-import java.{lang, util}
import java.time.Instant
-
-import scala.jdk.CollectionConverters._
+import java.util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
@@ -28,7 +26,7 @@ import
org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
import org.apache.spark.sql.connector.expressions.{FieldReference,
LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite,
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory,
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering,
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo,
SupportsDelta, Write, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite,
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory,
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering,
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo,
SupportsDelta, Write, WriteBuilder, WriterCommitMessage, WriteSummary}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -116,14 +114,9 @@ class InMemoryRowLevelOperationTable(
abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
- override def commit(messages: Array[WriterCommitMessage],
- metrics: util.Map[String,
lang.Long]): Unit = {
- metrics.asScala.map {
- case (key, value) => commitProperties += key -> String.valueOf(value)
- }
+ override def commit(messages: Array[WriterCommitMessage], metrics:
WriteSummary): Unit = {
commit(messages)
- commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
- commitProperties.clear()
+ commits += Commit(Instant.now().toEpochMilli, Some(metrics))
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 4904e3d60dc9..2a3a3441accc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.execution.datasources.v2
-import java.lang
-import java.util
-
import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -34,7 +31,7 @@ import
org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo,
TableWritePrivilege}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write,
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl,
PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -455,9 +452,12 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode with AdaptiveSpa
}
)
- val operationMetrics = getOperationMetrics(query)
+ val writeSummary = getWriteSummary(query)
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE,
batchWrite)} is committing.")
- batchWrite.commit(messages, operationMetrics)
+ writeSummary match {
+ case Some(summary) => batchWrite.commit(messages, summary)
+ case None => batchWrite.commit(messages)
+ }
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE,
batchWrite)} committed.")
commitProgress =
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
} catch {
@@ -480,10 +480,20 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode with AdaptiveSpa
Nil
}
- private def getOperationMetrics(query: SparkPlan): util.Map[String,
lang.Long] = {
- collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
- n.metrics.map { case (name, metric) => s"merge.$name" ->
lang.Long.valueOf(metric.value) }
- }.getOrElse(Map.empty[String, lang.Long]).asJava
+ private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
+ collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
+ val metrics = n.metrics
+ MergeSummaryImpl(
+ metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
+ metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
+ metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
+ metrics.get("numTargetRowsInserted").map(_.value).getOrElse(-1L),
+ metrics.get("numTargetRowsMatchedUpdated").map(_.value).getOrElse(-1L),
+ metrics.get("numTargetRowsMatchedDeleted").map(_.value).getOrElse(-1L),
+
metrics.get("numTargetRowsNotMatchedBySourceUpdated").map(_.value).getOrElse(-1L),
+
metrics.get("numTargetRowsNotMatchedBySourceDeleted").map(_.value).getOrElse(-1L)
+ )
+ }
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 7d879e2c9a5e..2e175951851a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -23,6 +23,7 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, I
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
ColumnDefaultValue, InMemoryTable, TableInfo}
import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression,
LiteralValue}
+import org.apache.spark.sql.connector.write.MergeSummary
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
@@ -1812,16 +1813,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(2, 200, "software"),
Row(3, 300, "hr")))
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0"
else "2"))
- assert(commitProps("merge.numTargetRowsInserted") === "0")
- assert(commitProps("merge.numTargetRowsUpdated") === "1")
- assert(commitProps("merge.numTargetRowsDeleted") === "0")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") ===
"0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") ===
"0")
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L))
+ assert(mergeSummary.numTargetRowsInserted === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 1L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -1868,16 +1868,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(3, 300, "hr"),
Row(5, 400, "executive"))) // inserted
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === "0")
- assert(commitProps("merge.numTargetRowsInserted") === "1")
- assert(commitProps("merge.numTargetRowsUpdated") === "0")
- assert(commitProps("merge.numTargetRowsDeleted") === "0")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") ===
"0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") ===
"0")
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === 0L)
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -1923,16 +1922,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(4, 400, "marketing"),
Row(5, -1, "executive"))) // updated
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0"
else "3"))
- assert(commitProps("merge.numTargetRowsInserted") === "0")
- assert(commitProps("merge.numTargetRowsUpdated") === "2")
- assert(commitProps("merge.numTargetRowsDeleted") === "0")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") ===
"1")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") ===
"0")
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 2L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -1980,16 +1978,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
// Row(5, 500, "executive") deleted
)
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0"
else "3"))
- assert(commitProps("merge.numTargetRowsInserted") === "0")
- assert(commitProps("merge.numTargetRowsUpdated") === "0")
- assert(commitProps("merge.numTargetRowsDeleted") === "2")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") ===
"0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") ===
"1")
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 2L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
}
}
@@ -2038,16 +2035,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(5, -1, "executive"), // updated
Row(6, -1, "dummy"))) // inserted
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0"
else "3"))
- assert(commitProps("merge.numTargetRowsInserted") === "1")
- assert(commitProps("merge.numTargetRowsUpdated") === "2")
- assert(commitProps("merge.numTargetRowsDeleted") === "0")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") ===
"1")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") ===
"0")
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsUpdated === 2L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 1L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -2096,16 +2092,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
// Row(5, 500, "executive") deleted
Row(6, -1, "dummy"))) // inserted
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0"
else "3"))
- assert(commitProps("merge.numTargetRowsInserted") === "1")
- assert(commitProps("merge.numTargetRowsUpdated") === "0")
- assert(commitProps("merge.numTargetRowsDeleted") === "2")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") ===
"0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") ===
"1")
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L))
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 2L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 1L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L)
}
}
@@ -2137,16 +2132,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
|""".stripMargin
)
- val table = catalog.loadTable(ident)
- val commitProps =
table.asInstanceOf[InMemoryTable].commits.last.properties
- assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge)
"0" else "3"))
- assert(commitProps("merge.numTargetRowsInserted") === "1")
- assert(commitProps("merge.numTargetRowsUpdated") === "0")
- assert(commitProps("merge.numTargetRowsDeleted") === "2")
- assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
- assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated")
=== "0")
- assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted")
=== "1")
+ val mergeMetrics = getMergeSummary()
+ assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else
3L))
+ assert(mergeMetrics.numTargetRowsInserted === 1L)
+ assert(mergeMetrics.numTargetRowsUpdated === 0L)
+ assert(mergeMetrics.numTargetRowsDeleted === 2L)
+ assert(mergeMetrics.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeMetrics.numTargetRowsMatchedDeleted === 1L)
+ assert(mergeMetrics.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeMetrics.numTargetRowsNotMatchedBySourceDeleted === 1L)
sql(s"DROP TABLE $tableNameAsString")
}
@@ -3274,6 +3268,12 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
+ private def getMergeSummary(): MergeSummary = {
+ val table = catalog.loadTable(ident)
+ table.asInstanceOf[InMemoryTable].commits.last.writeSummary.get
+ .asInstanceOf[MergeSummary]
+ }
+
private def assertNoLeftBroadcastOrReplication(query: String): Unit = {
val plan = executeAndKeepPlan {
sql(query)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]