This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 83dc6efb16db [SPARK-56537][SS] Reset per-batch time fields and 
customMetrics on no batch trigger progress event
83dc6efb16db is described below

commit 83dc6efb16db78cd1fed40ca9a942f8632b92e8f
Author: Dhruv Patel <[email protected]>
AuthorDate: Thu May 14 08:07:18 2026 -0700

    [SPARK-56537][SS] Reset per-batch time fields and customMetrics on no batch 
trigger progress event
    
    ### What changes were proposed in this pull request?
    
    This PR is a follow-up to 
[SPARK-56464](https://issues.apache.org/jira/browse/SPARK-56464) (commit 
`930c3039871`), which left a `TODO(SPARK-56537)` in 
`ProgressReporter#resetExecStatsForNoExecution` to track the remaining 
per-batch fields on `StateOperatorProgress` that were not being reset on 
no-data trigger progress events.
    
    Three changes:
    
    1. **Reset the per-batch time fields on no-data trigger progress events.** 
`allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset to 0 
alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`, 
`numRowsDroppedByWatermark`) that were already handled by SPARK-56464.
    
    2. **Reset per-batch entries of `customMetrics` while preserving snapshot 
entries.** `StateOperatorProgress.customMetrics` carries values from two metric 
registries (`StateStoreCustomMetric` for provider-level, 
`StatefulOperatorCustomMetric` for operator-level) and conflates per-batch 
counters/timings with snapshot reads of state-store status (current memory 
usage, key counts, file size). On a no-data trigger we now zero per-batch 
entries and preserve snapshot entries.
    
       The snapshot/per-batch distinction is encoded at the metric definition 
via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default 
`false`). The six snapshot Size metrics are marked at their definitions:
       - RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`, 
`rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, 
`rocksdbNumInternalColumnFamilies`.
       - HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`.
    
       `StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep 
using the trait default (always per-batch). Operator-level 
`StatefulOperatorCustomSumMetric` instances (declared by 
`BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and 
`TransformWithStateExecBase`) are also always per-batch.
    
    3. **Centralize the reset semantics in a new `copyForNoExecution()` method 
on `StateOperatorProgress`** instead of growing `copy(...)`'s parameter list 
further. The method takes no parameters; it inspects the operator instance's 
`snapshotCustomMetricNames` (a new `private[spark]` constructor field, 
defaulted to `Set.empty`, populated at progress build time by 
`StateStoreWriter.getProgress`) to decide which `customMetrics` keys to 
preserve. The existing 3-arg `copy(newNumRowsUpdated, n [...]
    
    The `TODO(SPARK-56537)` comment is removed from 
`ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a 
single delegating map: 
`originExecStats.stateOperators.map(_.copyForNoExecution())`.
    
    ### Why are the changes needed?
    
    Today, on a no-data ("idle") trigger progress event, 
`StateOperatorProgress` carries the previous batch's values for 
`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`, and most of 
`customMetrics`. To a user reading `query.lastProgress` / 
`query.recentProgress` during an idle period this looks like work was performed 
when none was. It is also a known source of test flakiness.
    
    The `TODO(SPARK-56537)` left by SPARK-56464 in 
`ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this 
follow-up.
    
    The design was discussed on the JIRA ticket and confirmed before 
implementation:
    - Encode snapshot semantics at the metric definition (option (2b) in the 
audit comment), not via a hardcoded whitelist in the reset routine.
    - Add a new `copyForNoExecution()` method on `StateOperatorProgress` rather 
than growing the existing `copy(...)` argument list further (3 args after 
SPARK-56464 would have become 6+).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No public API change.
    
    User-visible behavior change: idle-trigger progress events emitted via 
`StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and 
`query.recentProgress` will now report `0` for all per-batch fields and 
per-batch `customMetrics` entries instead of carrying stale values from the 
previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`, 
`numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics) are 
unchanged. Same direction as the SPARK-56464 fi [...]
    
    ### How was this patch tested?
    
    New and updated tests in `ProgressReporterSuite.scala`:
    
    1. Extended the SPARK-56464 test with assertions that the three time fields 
(`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0 on the 
idle trigger, alongside the existing row-count assertions. Test description 
updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch 
resets all per-batch StateOperatorProgress fields to zero" to reflect the 
broader scope.
    
    2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but 
preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot 
split end-to-end against a real `RocksDBStateStoreProvider`. The test runs one 
data batch, advances the manual clock to trigger an idle progress event, then 
asserts that 3 per-batch RocksDB metrics (`rocksdbCommitFlushLatency`, 
`rocksdbPutCount`, `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5 
snapshot RocksDB metrics (` [...]
    
    Local verification:
    - `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass.
    - `build/sbt 'sql/testOnly *ProgressReporterSuite 
*StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite 
*StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass 
in 7m 16s.
    - `dev/mima` -> no exclusions required.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7
    
    Closes #55699 from DHRUV6029/SPARK-56537.
    
    Authored-by: Dhruv Patel <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/tests/streaming/test_streaming_listener.py |   7 +-
 .../org/apache/spark/sql/streaming/progress.scala  |  34 +++++-
 .../operators/stateful/statefulOperators.scala     |  46 +++++--
 .../streaming/runtime/ProgressReporter.scala       |   6 +-
 .../state/HDFSBackedStateStoreProvider.scala       |   3 +-
 .../state/RocksDBStateStoreProvider.scala          |  17 ++-
 .../sql/execution/streaming/state/StateStore.scala |   9 +-
 .../streaming/ProgressReporterSuite.scala          | 132 ++++++++++++++++++++-
 8 files changed, 227 insertions(+), 27 deletions(-)

diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py 
b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
index 620186f70b66..b4922f54b217 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
@@ -312,7 +312,12 @@ class StreamingListenerTests(StreamingListenerTestsMixin, 
ReusedSQLTestCase):
         )
         self.assertEqual(
             
get_number_of_public_methods("org.apache.spark.sql.streaming.StateOperatorProgress"),
-            27,
+            # SPARK-56537: bumped from 27 to 30 due to the new 
snapshotCustomMetricNames
+            # constructor parameter (getter + synthetic default) and the new 
internal
+            # copyForNoExecution() method on StateOperatorProgress. Both are 
non-public
+            # API (private[spark] / private[sql]) so they are not mirrored on 
the
+            # Python side; only the count needs updating.
+            30,
             msg,
         )
         self.assertEqual(
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 0502936e3cc4..619d8fb53311 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -56,7 +56,10 @@ class StateOperatorProgress private[spark] (
     val numRowsDroppedByWatermark: Long,
     val numShufflePartitions: Long,
     val numStateStoreInstances: Long,
-    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap(),
+    // Names of customMetrics entries treated as snapshots of state-store 
status;
+    // preserved by copyForNoExecution() and not surfaced in JSON output.
+    private[spark] val snapshotCustomMetricNames: Set[String] = Set.empty)
     extends Serializable {
 
   /** The compact JSON representation of this progress. */
@@ -81,7 +84,34 @@ class StateOperatorProgress private[spark] (
       numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
       numShufflePartitions = numShufflePartitions,
       numStateStoreInstances = numStateStoreInstances,
-      customMetrics = customMetrics)
+      customMetrics = customMetrics,
+      snapshotCustomMetricNames = snapshotCustomMetricNames)
+
+  /**
+   * Returns a copy of this progress suitable for a no-data trigger event. 
Per-batch fields (row
+   * counts, time-Ms fields, and customMetrics entries not in 
`snapshotCustomMetricNames`) are
+   * zeroed; snapshot fields and snapshot customMetrics entries are preserved.
+   */
+  private[sql] def copyForNoExecution(): StateOperatorProgress = {
+    val newCustomMetrics = new ju.HashMap[String, JLong](customMetrics.size())
+    customMetrics.forEach { (k, v) =>
+      newCustomMetrics.put(k, if (snapshotCustomMetricNames.contains(k)) v 
else 0L)
+    }
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = 0L,
+      allUpdatesTimeMs = 0L,
+      numRowsRemoved = 0L,
+      allRemovalsTimeMs = 0L,
+      commitTimeMs = 0L,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = 0L,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = newCustomMetrics,
+      snapshotCustomMetricNames = snapshotCustomMetricNames)
+  }
 
   private[sql] def jsonValue: JValue = {
     ("operatorName" -> JString(operatorName)) ~
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
index 9fcc0a506570..59a2b9ee74f8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
@@ -137,6 +137,9 @@ trait StatefulOperatorCustomMetric {
   def name: String
   def desc: String
   def createSQLMetric(sparkContext: SparkContext): SQLMetric
+  // True if the metric reflects current state rather than per-batch work; 
snapshot
+  // metrics are preserved on no-data trigger events. Mirrors 
StateStoreCustomMetric.
+  def isSnapshot: Boolean = false
 }
 
 /** Custom stateful operator metric for simple "count" gauge */
@@ -402,7 +405,8 @@ trait StateStoreWriter
       numRowsDroppedByWatermark = 
longMetric("numRowsDroppedByWatermark").value,
       numShufflePartitions = 
stateInfo.map(_.numPartitions.toLong).getOrElse(-1L),
       numStateStoreInstances = longMetric("numStateStoreInstances").value,
-      javaConvertedCustomMetrics
+      javaConvertedCustomMetrics,
+      snapshotCustomMetricNames
     )
   }
 
@@ -475,17 +479,43 @@ trait StateStoreWriter
     }.toMap
   }
 
-  private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, 
SQLMetric] = {
+  // All instance metrics with their (partitionId, storeName) bindings; 
consumed by
+  // both `stateStoreInstanceMetrics` (for SQLMetric registration) and
+  // `snapshotCustomMetricNames` (for the snapshot-name set). The result is a
+  // serializable Seq so storing it as a lazy val on this trait is safe even 
when
+  // the enclosing SparkPlan is shipped to executors. The provider itself is 
NOT
+  // stored as a field (it is non-serializable), so each consumer below 
recreates
+  // it locally.
+  private lazy val stateStoreInstanceMetricsWithIds: 
Seq[StateStoreInstanceMetric] = {
     val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
-    val maxPartitions = 
stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions)
-
+    val maxPartitions =
+      
stateInfo.map(_.numPartitions).getOrElse(conf.defaultNumShufflePartitions)
     (0 until maxPartitions).flatMap { partitionId =>
       provider.supportedInstanceMetrics.flatMap { metric =>
-        stateStoreNames.map { storeName =>
-          val metricWithPartition = metric.withNewId(partitionId, storeName)
-          (metricWithPartition, 
metricWithPartition.createSQLMetric(sparkContext))
-        }
+        stateStoreNames.map(metric.withNewId(partitionId, _))
       }
+    }
+  }
+
+  // Names of customMetrics entries treated as snapshots; preserved by
+  // StateOperatorProgress.copyForNoExecution() on no-data trigger events. 
Includes
+  // provider- and operator-level metrics with isSnapshot = true, and all 
instance
+  // metric names (instance metrics use sentinel inits like -1 with monotonic
+  // combine, so they are always snapshot-style).
+  private lazy val snapshotCustomMetricNames: Set[String] = {
+    val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+    val customSnapshots = provider.supportedCustomMetrics.collect {
+      case m if m.isSnapshot => m.name
+    }.toSet
+    val operatorSnapshots = customStatefulOperatorMetrics.collect {
+      case m if m.isSnapshot => m.name
+    }.toSet
+    customSnapshots ++ operatorSnapshots ++ 
stateStoreInstanceMetricsWithIds.map(_.name).toSet
+  }
+
+  private def stateStoreInstanceMetrics: Map[StateStoreInstanceMetric, 
SQLMetric] = {
+    stateStoreInstanceMetricsWithIds.map { metric =>
+      (metric, metric.createSQLMetric(sparkContext))
     }.toMap
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
index 73b75df1a599..161696fb9260 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
@@ -648,13 +648,9 @@ abstract class ProgressContext(
    * New execution stats will only retain the values as a snapshot of the 
query status.
    * (E.g. for stateful operators, numRowsTotal is a snapshot of the status, 
whereas
    * numRowsUpdated is bound to the batch.)
-   * TODO(SPARK-56537): We do not seem to clear up all values in 
StateOperatorProgress which are
-   * bound to the batch. Fix this.
    */
   private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): 
ExecutionStats = {
-    val newStatefulOperators = originExecStats.stateOperators.map { so =>
-      so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0, 
newNumRowsRemoved = 0)
-    }
+    val newStatefulOperators = 
originExecStats.stateOperators.map(_.copyForNoExecution())
     val newEventTimeStats = if 
(originExecStats.eventTimeStats.contains("watermark")) {
       Map("watermark" -> 
progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 29a17b4eb7ec..2562f1ff3304 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -566,7 +566,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
   private lazy val metricStateOnCurrentVersionSizeBytes: 
StateStoreCustomSizeMetric =
     StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
-      "estimated size of state only on current version")
+      "estimated size of state only on current version",
+      isSnapshot = true)
 
   private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
     StateStoreCustomSumMetric("loadedMapCacheHitCount",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 45168f407132..b3d734c71f91 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -1488,22 +1488,27 @@ object RocksDBStateStoreProvider {
   val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric(
     "rocksdbTotalBytesWrittenByFlush",
     "RocksDB: flush - total bytes written by flush")
+  // Snapshot metrics: read current RocksDB state, preserved on no-data 
trigger events.
   val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric(
     "rocksdbPinnedBlocksMemoryUsage",
-    "RocksDB: memory usage for pinned blocks")
+    "RocksDB: memory usage for pinned blocks",
+    isSnapshot = true)
   val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS = 
StateStoreCustomSizeMetric(
     "rocksdbNumInternalColFamiliesKeys",
-    "RocksDB: number of internal keys for internal column families")
+    "RocksDB: number of internal keys for internal column families",
+    isSnapshot = true)
   val CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric(
     "rocksdbNumExternalColumnFamilies",
-    "RocksDB: number of external column families")
+    "RocksDB: number of external column families",
+    isSnapshot = true)
   val CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES = StateStoreCustomSizeMetric(
     "rocksdbNumInternalColumnFamilies",
-    "RocksDB: number of internal column families")
+    "RocksDB: number of internal column families",
+    isSnapshot = true)
 
-  // Total SST file size
+  // Total SST file size (snapshot).
   val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric(
-    "rocksdbSstFileSize", "RocksDB: size of all SST files")
+    "rocksdbSstFileSize", "RocksDB: size of all SST files", isSnapshot = true)
   val CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED = StateStoreCustomSumMetric(
     "rocksdbNumSnapshotsAutoRepaired",
     "RocksDB: number of snapshots that were automatically repaired during 
store load")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index e3601f1ef224..ad067b8edcc3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -537,6 +537,10 @@ trait StateStoreCustomMetric {
   def desc: String
   def withNewDesc(desc: String): StateStoreCustomMetric
   def createSQLMetric(sparkContext: SparkContext): SQLMetric
+
+  // True if the metric reflects current store state (e.g. file size, memory) 
rather
+  // than per-batch work; snapshot metrics are preserved on no-data trigger 
events.
+  def isSnapshot: Boolean = false
 }
 
 case class StateStoreCustomSumMetric(name: String, desc: String) extends 
StateStoreCustomMetric {
@@ -546,7 +550,10 @@ case class StateStoreCustomSumMetric(name: String, desc: 
String) extends StateSt
     SQLMetrics.createMetric(sparkContext, desc)
 }
 
-case class StateStoreCustomSizeMetric(name: String, desc: String) extends 
StateStoreCustomMetric {
+case class StateStoreCustomSizeMetric(
+    name: String,
+    desc: String,
+    override val isSnapshot: Boolean = false) extends StateStoreCustomMetric {
   override def withNewDesc(desc: String): StateStoreCustomSizeMetric = 
copy(desc = desc)
 
   override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
index da037936849e..134bfc6b914b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProgressReporterSuite.scala
@@ -20,15 +20,16 @@ package org.apache.spark.sql.execution.streaming
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
 import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress, 
StreamTest, Trigger}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 
 class ProgressReporterSuite extends StreamTest {
   import testImplicits._
 
-  test("no-data batch resets numRowsRemoved to zero" +
+  test("no-data batch resets all per-batch StateOperatorProgress fields to 
zero" +
       " via resetExecStatsForNoExecution") {
     val clock = new StreamManualClock
     val input = MemoryStream[Int]
@@ -73,7 +74,7 @@ class ProgressReporterSuite extends StreamTest {
             .exists(_.stateOperators.head.numRowsRemoved > 0)
           assert(removed, "Expected numRowsRemoved > 0")
         },
-        // Idle trigger — finishNoExecutionTrigger calls
+        // Idle trigger: finishNoExecutionTrigger calls
         // resetExecStatsForNoExecution which must zero out
         // per-batch metrics.
         AdvanceManualClock(1 * 1000),
@@ -94,10 +95,135 @@ class ProgressReporterSuite extends StreamTest {
             assert(so.numRowsUpdated === 0, 
s"numRowsUpdated=${so.numRowsUpdated}")
             assert(so.numRowsDroppedByWatermark === 0,
               s"numRowsDroppedByWatermark=${so.numRowsDroppedByWatermark}")
+            assert(so.allUpdatesTimeMs === 0,
+              s"allUpdatesTimeMs=${so.allUpdatesTimeMs}")
+            assert(so.allRemovalsTimeMs === 0,
+              s"allRemovalsTimeMs=${so.allRemovalsTimeMs}")
+            assert(so.commitTimeMs === 0,
+              s"commitTimeMs=${so.commitTimeMs}")
           }
         },
         StopStream
       )
     }
   }
+
+  test("SPARK-56537: no-data batch resets per-batch customMetrics but" +
+      " preserves snapshot customMetrics (RocksDB)") {
+    val clock = new StreamManualClock
+    val input = MemoryStream[Int]
+    val agg = input.toDF()
+      .select(timestamp_seconds($"value") as "ts", $"value")
+      .withWatermark("ts", "10 seconds")
+      .groupBy(window($"ts", "10 seconds"))
+      .agg(count("*") as "cnt")
+      .select($"window".getField("start").cast("long"), $"cnt")
+
+    withSQLConf(
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+          classOf[RocksDBStateStoreProvider].getName,
+        SQLConf.STREAMING_POLLING_DELAY.key -> "0",
+        SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "0") {
+      testStream(agg, outputMode = OutputMode.Update)(
+        StartStream(
+          Trigger.ProcessingTime("1 second"),
+          triggerClock = clock),
+        // Batch 0: real data, populates customMetrics with non-zero per-batch 
values.
+        AddData(input, 1, 2, 3),
+        AdvanceManualClock(1 * 1000),
+        CheckNewAnswer((0L, 3L)),
+        // Idle trigger.
+        AdvanceManualClock(1 * 1000),
+        Execute("verify customMetrics behavior on idle trigger") { q =>
+          eventually(Timeout(streamingTimeout)) {
+            val progress = q.recentProgress.filter(_.stateOperators.nonEmpty)
+            val lastDataIdx = progress.lastIndexWhere { p =>
+              p.durationMs.containsKey("addBatch")
+            }
+            assert(lastDataIdx >= 0, "no data batch found")
+            val idleIdx = progress.indexWhere(
+              !_.durationMs.containsKey("addBatch"), lastDataIdx + 1)
+            assert(idleIdx > lastDataIdx,
+              "no idle trigger found after data batch")
+
+            val dataCm = 
progress(lastDataIdx).stateOperators.head.customMetrics
+            val idleCm = progress(idleIdx).stateOperators.head.customMetrics
+
+            // Per-batch RocksDB metrics: zeroed on idle. The metric must be 
present
+            // in the map (we keep keys consistent across data and idle 
progress).
+            Seq("rocksdbCommitFlushLatency",
+                "rocksdbPutCount",
+                "rocksdbTotalBytesWritten").foreach { k =>
+              assert(idleCm.containsKey(k), s"$k missing on idle")
+              assert(idleCm.get(k) === 0L,
+                s"per-batch metric $k expected 0 on idle, got 
${idleCm.get(k)}")
+            }
+
+            // Snapshot RocksDB metrics: value unchanged across idle trigger.
+            Seq("rocksdbPinnedBlocksMemoryUsage",
+                "rocksdbNumInternalColFamiliesKeys",
+                "rocksdbNumExternalColumnFamilies",
+                "rocksdbNumInternalColumnFamilies",
+                "rocksdbSstFileSize").foreach { k =>
+              assert(idleCm.containsKey(k), s"$k missing on idle")
+              assert(idleCm.get(k) === dataCm.get(k),
+                s"snapshot metric $k changed across idle trigger: " +
+                  s"data=${dataCm.get(k)} idle=${idleCm.get(k)}")
+            }
+          }
+        },
+        StopStream
+      )
+    }
+  }
+
+  test("SPARK-56537: copyForNoExecution zeroes per-batch fields and preserves 
snapshot fields") {
+    val customMetrics = new java.util.HashMap[String, java.lang.Long]()
+    customMetrics.put("perBatchTimer", 100L)
+    customMetrics.put("perBatchCounter", 50L)
+    customMetrics.put("snapshotSize", 999L)
+    val orig = new StateOperatorProgress(
+      operatorName = "op",
+      numRowsTotal = 50L,
+      numRowsUpdated = 10L,
+      allUpdatesTimeMs = 7L,
+      numRowsRemoved = 3L,
+      allRemovalsTimeMs = 5L,
+      commitTimeMs = 11L,
+      memoryUsedBytes = 2048L,
+      numRowsDroppedByWatermark = 2L,
+      numShufflePartitions = 4L,
+      numStateStoreInstances = 4L,
+      customMetrics = customMetrics,
+      snapshotCustomMetricNames = Set("snapshotSize"))
+
+    val out = orig.copyForNoExecution()
+
+    // Per-batch fields are zeroed.
+    assert(out.numRowsUpdated === 0L)
+    assert(out.allUpdatesTimeMs === 0L)
+    assert(out.numRowsRemoved === 0L)
+    assert(out.allRemovalsTimeMs === 0L)
+    assert(out.commitTimeMs === 0L)
+    assert(out.numRowsDroppedByWatermark === 0L)
+
+    // Snapshot fields are preserved.
+    assert(out.operatorName === "op")
+    assert(out.numRowsTotal === 50L)
+    assert(out.memoryUsedBytes === 2048L)
+    assert(out.numShufflePartitions === 4L)
+    assert(out.numStateStoreInstances === 4L)
+
+    // customMetrics: per-batch zeroed, snapshot preserved.
+    assert(out.customMetrics.get("perBatchTimer") === 0L)
+    assert(out.customMetrics.get("perBatchCounter") === 0L)
+    assert(out.customMetrics.get("snapshotSize") === 999L)
+
+    // Original is not mutated.
+    assert(orig.numRowsUpdated === 10L)
+    assert(orig.customMetrics.get("perBatchTimer") === 100L)
+
+    // snapshotCustomMetricNames is preserved so subsequent copy() round-trips 
work.
+    assert(out.snapshotCustomMetricNames === Set("snapshotSize"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to