This is an automated email from the ASF dual-hosted git repository.

gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new a985468ed74d [SPARK-56551][SQL][FOLLOW-UP] Fix setting 
`numDeletedRows` metric as -1
a985468ed74d is described below

commit a985468ed74d4090d3af19300fd3dd83441cdf33
Author: Ziya Mukhtarov <[email protected]>
AuthorDate: Mon May 11 10:55:25 2026 -0700

    [SPARK-56551][SQL][FOLLOW-UP] Fix setting `numDeletedRows` metric as -1
    
    ### What changes were proposed in this pull request?
    
    We were previously calling `SQLMetric.set(-1)` when we couldn't compute the 
value of `numDeletedRows` metric. However, this call was a no-op, and we 
reported this metric in the write summary as 0 instead. This PR fixes it to 
report -1 as intended.
    
    ### Why are the changes needed?
    
    Fix the bug above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a new test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7
    
    Closes #55576 from ZiyaZa/fix-negative-numdeletedrows.
    
    Authored-by: Ziya Mukhtarov <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
    (cherry picked from commit 759036d6088b205f12f4f6073ce741896af42b10)
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 91 ++++++++++++----------
 .../sql/connector/DeleteFromTableSuiteBase.scala   | 23 ++++++
 2 files changed, 73 insertions(+), 41 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index ccfcdc1855f0..3cbfed40d876 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -365,24 +365,24 @@ case class ReplaceDataExec(
     copy(query = newChild)
   }
 
-  override protected def getWriteSummary(query: SparkPlan): 
Option[WriteSummary] = {
-    if (rowLevelCommand == DELETE) {
-      // DELETE ReplaceData plans filter out the deleted rows early in the 
plan, and they don't
-      // reach this node. We need to calculate this value as numScannedRows - 
numCopiedRows.
-      val numScannedRows = collectFirst(query) {
-        case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] 
=>
-          getMetricValue(b.metrics, "numOutputRows")
-      }
-      val numCopiedRows = getMetricValue(metrics, "numCopiedRows")
-      val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows 
>= 0) {
-        numScannedRows.get - numCopiedRows
-      } else {
-        // One of the metrics couldn't be found, also mark numDeletedRows as 
not found.
-        -1L
-      }
-      metrics("numDeletedRows").set(numDeletedRows)
+  override protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
+    // DELETE ReplaceData plans filter out the deleted rows early in the plan, 
and they don't
+    // reach this node. We need to calculate this value as numScannedRows - 
numCopiedRows.
+    val numScannedRows = collectFirst(query) {
+      case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] =>
+        getMetricValue(b.metrics, "numOutputRows")
     }
-    super.getWriteSummary(query)
+    val numCopiedRows = getMetricValue(sparkMetrics, "numCopiedRows")
+    val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >= 
0) {
+      numScannedRows.get - numCopiedRows
+    } else {
+      // One of the metrics couldn't be found, also mark numDeletedRows as not 
found.
+      -1L
+    }
+
+    // SQLMetric.set is a no-op if value is -1, leaving the metric in its 
invalid state.
+    sparkMetrics("numDeletedRows").set(numDeletedRows)
+    super.getDeleteSummary().map(_.copy(numDeletedRows = numDeletedRows))
   }
 }
 
@@ -496,31 +496,40 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
     metrics.get(name).map(_.value).getOrElse(-1L)
   }
 
-  override protected def getWriteSummary(query: SparkPlan): 
Option[WriteSummary] = {
+  override protected def getWriteSummary(): Option[WriteSummary] = {
     rowLevelCommand match {
-      case MERGE =>
-        collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
-          val metrics = n.metrics
-          MergeSummaryImpl(
-            getMetricValue(metrics, "numTargetRowsCopied"),
-            getMetricValue(metrics, "numTargetRowsDeleted"),
-            getMetricValue(metrics, "numTargetRowsUpdated"),
-            getMetricValue(metrics, "numTargetRowsInserted"),
-            getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
-            getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
-            getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
-            getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
-        }
-      case UPDATE =>
-        Some(UpdateSummaryImpl(
-          getMetricValue(sparkMetrics, "numUpdatedRows"),
-          getMetricValue(sparkMetrics, "numCopiedRows")))
-      case DELETE =>
-        Some(DeleteSummaryImpl(
-          getMetricValue(sparkMetrics, "numDeletedRows"),
-          getMetricValue(sparkMetrics, "numCopiedRows")))
+      case MERGE => getMergeSummary()
+      case UPDATE => getUpdateSummary()
+      case DELETE => getDeleteSummary()
     }
   }
+
+  protected def getMergeSummary(): Option[MergeSummaryImpl] = {
+    collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
+      val metrics = n.metrics
+      MergeSummaryImpl(
+        getMetricValue(metrics, "numTargetRowsCopied"),
+        getMetricValue(metrics, "numTargetRowsDeleted"),
+        getMetricValue(metrics, "numTargetRowsUpdated"),
+        getMetricValue(metrics, "numTargetRowsInserted"),
+        getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
+        getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
+        getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
+        getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
+    }
+  }
+
+  protected def getUpdateSummary(): Option[UpdateSummaryImpl] = {
+    Some(UpdateSummaryImpl(
+      getMetricValue(sparkMetrics, "numUpdatedRows"),
+      getMetricValue(sparkMetrics, "numCopiedRows")))
+  }
+
+  protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
+    Some(DeleteSummaryImpl(
+      getMetricValue(sparkMetrics, "numDeletedRows"),
+      getMetricValue(sparkMetrics, "numCopiedRows")))
+  }
 }
 
 /**
@@ -582,7 +591,7 @@ trait V2TableWriteExec
         }
       )
 
-      val writeSummary = getWriteSummary(query)
+      val writeSummary = getWriteSummary()
       logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, 
batchWrite)} is committing.")
       writeSummary match {
         case Some(summary) => batchWrite.commit(messages, summary)
@@ -610,7 +619,7 @@ trait V2TableWriteExec
     Nil
   }
 
-  protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = None
+  protected def getWriteSummary(): Option[WriteSummary] = None
 }
 
 trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with 
Serializable {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index f8d81ee08691..89e3ce503fed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -959,6 +959,29 @@ abstract class DeleteFromTableSuiteBase extends 
RowLevelOperationSuiteBase {
         Row(2, 200, "software")))
   }
 
+  test("delete with NOT IN over empty subquery") {
+    withTempView("empty_subq") {
+      createAndInitTable("pk INT NOT NULL, id INT NOT NULL, dep STRING",
+        """{ "pk": 1, "id": 1, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "dep": "hr" }
+          |{ "pk": 3, "id": 3, "dep": "hr" }
+          |""".stripMargin)
+
+      Seq.empty[Int].toDF("v").createOrReplaceTempView("empty_subq")
+
+      sql(
+        s"""DELETE FROM $tableNameAsString
+           |WHERE id NOT IN (SELECT v FROM empty_subq)
+           |""".stripMargin)
+
+      checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil)
+      // The filter gets replaced by an EmptyRelation in the ReplaceData 
executed plan, which hides
+      // the executed BatchScan and prevents computing numDeletedRows using 
numOutputRows of the
+      // scan node.
+      checkDeleteMetrics(numDeletedRows = if (deltaDelete) 3 else -1, 
numCopiedRows = 0)
+    }
+  }
+
   private def executeDeleteWithFilters(query: String): Unit = {
     val executedPlan = executeAndKeepPlan {
       sql(query)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to