This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 570b32500844 [SPARK-52689][SQL] Send DML Metrics to V2Write 570b32500844 is described below commit 570b32500844893b4ed23a934b584248918e7480 Author: Szehon Ho <szehon.apa...@gmail.com> AuthorDate: Mon Jul 28 17:29:35 2025 +0800 [SPARK-52689][SQL] Send DML Metrics to V2Write ### What changes were proposed in this pull request? Send some DML execution metrics (ie, MergeRowsExec) to the write of these data source, so they can persist them for debugging purpose. ### Why are the changes needed? DML row-level-operations, ie MERGE, UPDATE, DELETE are a critical functionality of V2 data sources (like Iceberg). It will be nice, if we can send some DML metrics to the commit of these data source, so they can persist them for debugging purpose on commit metadata. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #51377 from szehon-ho/metric_to_write. Lead-authored-by: Szehon Ho <szehon.apa...@gmail.com> Co-authored-by: Szehon Ho <sze...@apache.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/connector/write/BatchWrite.java | 45 ++++++++ .../sql/connector/catalog/InMemoryBaseTable.scala | 15 +++ .../catalog/InMemoryRowLevelOperationTable.scala | 23 ++++- .../datasources/v2/WriteToDataSourceV2Exec.scala | 15 ++- .../sql/connector/MergeIntoTableSuiteBase.scala | 115 ++++++++++++++++++++- 5 files changed, 205 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java index 8c068928415f..c8febd0fe493 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java @@ -19,6 +19,8 @@ package org.apache.spark.sql.connector.write; import org.apache.spark.annotation.Evolving; +import java.util.Map; + /** * An interface that defines how to write the data to data source for batch processing. * <p> @@ -88,6 +90,49 @@ public interface BatchWrite { */ void commit(WriterCommitMessage[] messages); + /** + * Commits this writing job with a list of commit messages and operation metrics. + * <p> + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination + * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. + * <p> + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow at most one task to commit. Implementations can + * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple + * tasks may have committed successfully and one successful commit message per task will be + * passed to this commit method. The remaining commit messages are ignored by Spark. + * <p> + * @param messages a list of commit messages from successful data writers, produced by + * {@link DataWriter#commit()}. + * @param metrics a map of operation metrics collected from the query producing write. + * The keys will be prefixed by operation type, eg `merge`. + * <p> + * Currently supported metrics are: + * <ul> + * <li>Operation Type = `merge` + * <ul> + * <li>`numTargetRowsCopied`: number of target rows copied unmodified because + * they did not match any action</li> + * <li>`numTargetRowsDeleted`: number of target rows deleted</li> + * <li>`numTargetRowsUpdated`: number of target rows updated</li> + * <li>`numTargetRowsInserted`: number of target rows inserted</li> + * <li>`numTargetRowsMatchedUpdated`: number of target rows updated by a + * matched clause</li> + * <li>`numTargetRowsMatchedDeleted`: number of target rows deleted by a + * matched clause</li> + * <li>`numTargetRowsNotMatchedBySourceUpdated`: number of target rows + * updated by a not matched by source clause</li> + * <li>`numTargetRowsNotMatchedBySourceDeleted`: number of target rows + * deleted by a not matched by source clause</li> + * </ul> + * </li> + * </ul> + */ + default void commit(WriterCommitMessage[] messages, Map<String, Long> metrics) { + commit(messages); + } + /** * Aborts this writing job because some data writers are failed and keep failing when retry, * or the Spark job fails with some unknown reasons, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 4efb08a67829..a3b10054a359 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -23,6 +23,7 @@ import java.util import java.util.OptionalLong import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import com.google.common.base.Objects @@ -152,6 +153,8 @@ abstract class InMemoryBaseTable( // The key `Seq[Any]` is the partition values, value is a set of splits, each with a set of rows. val dataMap: mutable.Map[Seq[Any], Seq[BufferedRows]] = mutable.Map.empty + val commits: ListBuffer[Commit] = ListBuffer[Commit]() + def data: Array[BufferedRows] = dataMap.values.flatten.toArray def rows: Seq[InternalRow] = dataMap.values.flatten.flatMap(_.rows).toSeq @@ -616,6 +619,9 @@ abstract class InMemoryBaseTable( } protected abstract class TestBatchWrite extends BatchWrite { + + var commitProperties: mutable.Map[String, String] = mutable.Map.empty[String, String] + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { BufferedRowsWriterFactory } @@ -624,8 +630,11 @@ abstract class InMemoryBaseTable( } class Append(val info: LogicalWriteInfo) extends TestBatchWrite { + override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { withData(messages.map(_.asInstanceOf[BufferedRows])) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -634,6 +643,8 @@ abstract class InMemoryBaseTable( val newData = messages.map(_.asInstanceOf[BufferedRows]) dataMap --= newData.flatMap(_.rows.map(getKey)) withData(newData) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -641,6 +652,8 @@ abstract class InMemoryBaseTable( override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { dataMap.clear() withData(messages.map(_.asInstanceOf[BufferedRows])) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -882,6 +895,8 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends CustomTaskMetric { override def value(): Long = value } +case class Commit(id: Long, properties: Map[String, String]) + sealed trait Operation case object Write extends Operation case object Delete extends Operation diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index aeb807768b07..100570a67a07 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.connector.catalog -import java.util +import java.{lang, util} +import java.time.Instant + +import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow @@ -111,7 +114,21 @@ class InMemoryRowLevelOperationTable( override def description(): String = "InMemoryPartitionReplaceOperation" } - private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) extends TestBatchWrite { + abstract class RowLevelOperationBatchWrite extends TestBatchWrite { + + override def commit(messages: Array[WriterCommitMessage], + metrics: util.Map[String, lang.Long]): Unit = { + metrics.asScala.map { + case (key, value) => commitProperties += key -> String.valueOf(value) + } + commit(messages) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() + } + } + + private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) + extends RowLevelOperationBatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { val newData = messages.map(_.asInstanceOf[BufferedRows]) @@ -165,7 +182,7 @@ class InMemoryRowLevelOperationTable( } } - private object TestDeltaBatchWrite extends DeltaBatchWrite { + private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite with DeltaBatchWrite{ override def createBatchWriterFactory(info: PhysicalWriteInfo): DeltaWriterFactory = { DeltaBufferedRowsWriterFactory } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 193662718ce4..5801f0580de2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.lang +import java.util + import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkException, TaskContext} @@ -34,6 +37,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} @@ -398,7 +402,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec { /** * The base physical plan for writing data into data source v2. */ -trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { +trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSparkPlanHelper { def query: SparkPlan def writingTask: WritingSparkTask[_] = DataWritingSparkTask @@ -451,8 +455,9 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { } ) + val operationMetrics = getOperationMetrics(query) logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") - batchWrite.commit(messages) + batchWrite.commit(messages, operationMetrics) logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.") commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { @@ -474,6 +479,12 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { Nil } + + private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = { + collectFirst(query) { case m: MergeRowsExec => m }.map{ n => + n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) } + }.getOrElse(Map.empty[String, lang.Long]).asJava + } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 21b171bee961..e5786619f98f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, In, Not} import org.apache.spark.sql.catalyst.optimizer.BuildLeft -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, TableInfo} +import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, InMemoryTable, TableInfo} import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, LiteralValue} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1811,6 +1811,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(1, 1000, "hr"), // updated Row(2, 200, "software"), Row(3, 300, "hr"))) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "2")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "1") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1856,6 +1867,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(2, 200, "software"), Row(3, 300, "hr"), Row(5, 400, "executive"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === "0") + assert(commitProps("merge.numTargetRowsInserted") === "1") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1883,7 +1905,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase |""".stripMargin } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) assertMetric(mergeExec, "numTargetRowsInserted", 0) assertMetric(mergeExec, "numTargetRowsUpdated", 2) @@ -1901,6 +1922,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(3, 300, "hr"), Row(4, 400, "marketing"), Row(5, -1, "executive"))) // updated + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "2") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1947,6 +1979,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing")) // Row(5, 500, "executive") deleted ) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "0") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "2") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1") } } @@ -1994,6 +2037,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), Row(5, -1, "executive"), // updated Row(6, -1, "dummy"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "1") + assert(commitProps("merge.numTargetRowsUpdated") === "2") + assert(commitProps("merge.numTargetRowsDeleted") === "0") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "1") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -2032,7 +2086,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) - checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq( @@ -2042,6 +2095,62 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), // Row(5, 500, "executive") deleted Row(6, -1, "dummy"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "1") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "2") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1") + } + } + + test("SPARK-52689: V2 write metrics for merge") { + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") + + sql( + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | DELETE + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | DELETE + |""".stripMargin + ) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("merge.numTargetRowsInserted") === "1") + assert(commitProps("merge.numTargetRowsUpdated") === "0") + assert(commitProps("merge.numTargetRowsDeleted") === "2") + assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0") + assert(commitProps("merge.numTargetRowsMatchedDeleted") === "1") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "1") + + sql(s"DROP TABLE $tableNameAsString") + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org