This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d71afc7ea0f [SPARK-51682][SS] State Store Checkpoint V2 should handle 
offset log ahead of commit log correctly
5d71afc7ea0f is described below

commit 5d71afc7ea0f7fbab9b0f6d85e589c3ac6bc8a78
Author: Siying Dong <dong...@gmail.com>
AuthorDate: Wed Apr 2 14:18:50 2025 +0900

    [SPARK-51682][SS] State Store Checkpoint V2 should handle offset log ahead 
of commit log correctly
    
    ### What changes were proposed in this pull request?
    When State Store Checkpoint format V2 is used, we always read back 
checkpoint ID from commit log, rather than when commit log matches offset log.
    
    ### Why are the changes needed?
    Right now, there is a bug of reading checkpoint ID from commit log when the 
query restarts. If the offset log is ahead of commit log, it doesn't read it, 
and the tasks won't have checkpoint ID to recover from and the query will fail.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add a unit test that will fail without the fix.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50480 from siying/checkpoint_v2_commit_read.
    
    Authored-by: Siying Dong <dong...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../execution/streaming/MicroBatchExecution.scala  |  6 +--
 .../RocksDBStateStoreCheckpointFormatV2Suite.scala | 59 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index fe06cbb19c3a..c977a499edc0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -501,6 +501,9 @@ class MicroBatchExecution(
          * i.e., committedBatchId + 1 */
         commitLog.getLatest() match {
           case Some((latestCommittedBatchId, commitMetadata)) =>
+            commitMetadata.stateUniqueIds.foreach {
+              stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds
+            }
             if (latestBatchId == latestCommittedBatchId) {
               /* The last batch was successfully committed, so we can safely 
process a
                * new next batch but first:
@@ -520,9 +523,6 @@ class MicroBatchExecution(
               execCtx.startOffsets ++= execCtx.endOffsets
               watermarkTracker.setWatermark(
                 math.max(watermarkTracker.currentWatermark, 
commitMetadata.nextBatchWatermarkMs))
-              commitMetadata.stateUniqueIds.foreach {
-                stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds
-              }
             } else if (latestCommittedBatchId == latestBatchId - 1) {
               execCtx.endOffsets.foreach {
                 case (source: Source, end: Offset) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
index ffbeaead9512..22150ffde5db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
@@ -23,12 +23,12 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.scalatest.Tag
 
-import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.{SparkContext, SparkException, TaskContext}
 import org.apache.spark.sql.{DataFrame, ForeachWriter}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream}
 import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper
-import org.apache.spark.sql.functions.count
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.OutputMode.Update
@@ -836,6 +836,61 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
     }
   }
 
+  /**
+   * This test verifies when there are failures when executing the query.
+   * When restarts happens, the job should still be recovered to the last 
committed batch.
+   */
+  testWithCheckpointInfoTracked(s"checkpointFormatVersion2 query failure and 
restart") {
+    withTempDir { checkpointDir =>
+
+      var forceTaskFailure = false
+      val failUDF = udf((value: Int) => {
+        if (forceTaskFailure) {
+          // This will fail all close() call to trigger query failures in 
execution phase.
+          throw new RuntimeException("Ingest task failure")
+        }
+        value
+      })
+
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData
+          .toDF()
+          .select(failUDF($"value").as("value"))
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      testStream(aggregated, Update)(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        AddData(inputData, 3),
+        CheckLastBatch((3, 1)),
+        Execute { _ =>
+          forceTaskFailure = true
+        },
+        AddData(inputData, 3, 2),
+        ExpectFailure[SparkException] { ex =>
+          ex.getCause.getMessage.contains("FAILED_EXECUTE_UDF")
+        }
+      )
+
+      forceTaskFailure = false
+
+      // Test recovery
+      testStream(aggregated, Update)(
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        AddData(inputData, 3, 2, 1),
+        CheckLastBatch((3, 3), (2, 2), (1, 1)),
+        // By default we run in new tuple mode.
+        AddData(inputData, 4, 4, 4, 4),
+        CheckLastBatch((4, 4)),
+        AddData(inputData, 5, 5),
+        CheckLastBatch((5, 2)),
+        StopStream
+      )
+    }
+  }
+
   testWithCheckpointInfoTracked(s"checkpointFormatVersion2 validate ID") {
     withTempDir { checkpointDir =>
       val inputData = MemoryStream[Int]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to