This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4765c15a442e [SPARK-51573][SS] Fix Streaming State Checkpoint v2 
checkpointInfo race condition
4765c15a442e is described below

commit 4765c15a442ee92022b28398ba52136b8e05082d
Author: Livia Zhu <livia....@databricks.com>
AuthorDate: Thu Mar 27 12:24:39 2025 +0900

    [SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race 
condition
    
    ### What changes were proposed in this pull request?
    
    Return StateStoreCheckpointInfo as part of RocksDB.commit() and store it 
locally in the RocksDBStateStore so that RocksDBStateStore.getCheckpointInfo() 
always returns the checkpoint info belonging to its commit.
    
    ### Why are the changes needed?
    
    Fixes the bug explained in SPARK-51573. This race condition will result in 
tasks getting incorrect checkpointInfo which is a correctness bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50344 from liviazhu-db/liviazhu-db/checkpointinfo-race.
    
    Authored-by: Livia Zhu <livia....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-conditions.json |  6 +++++
 .../sql/execution/streaming/state/RocksDB.scala    | 14 +++++------
 .../state/RocksDBStateStoreProvider.scala          | 13 +++++++---
 .../streaming/state/StateStoreErrors.scala         | 10 ++++++++
 .../RocksDBStateStoreCheckpointFormatV2Suite.scala | 29 ++++++++++++++++++++++
 .../execution/streaming/state/RocksDBSuite.scala   |  8 +++---
 6 files changed, 65 insertions(+), 15 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 6417ed22344e..417170d4b744 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4859,6 +4859,12 @@
     ],
     "sqlState" : "42802"
   },
+  "STATE_STORE_OPERATION_OUT_OF_ORDER" : {
+    "message" : [
+      "Streaming stateful operator attempted to access state store out of 
order. This is a bug, please retry. error_msg=<errorMsg>"
+    ],
+    "sqlState" : "XXKST"
+  },
   "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : {
     "message" : [
       "The given State Store Provider <inputClass> does not extend 
org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplay.",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index aa8e96bac046..d4bf9d31617b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -60,11 +60,10 @@ case object StoreTaskCompletionListener extends 
RocksDBOpType("store_task_comple
  *
  * @note This class is not thread-safe, so use it only from one thread.
  * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
- * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
  * @param conf         Configuration for RocksDB
+ * @param stateStoreId StateStoreId for the state store
  * @param localRootDir Root directory in local disk that is used to working 
and checkpointing dirs
  * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
- * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
  */
 class RocksDB(
     dfsRootDir: String,
@@ -73,7 +72,8 @@ class RocksDB(
     hadoopConf: Configuration = new Configuration,
     loggingId: String = "",
     useColumnFamilies: Boolean = false,
-    enableStateStoreCheckpointIds: Boolean = false) extends Logging {
+    enableStateStoreCheckpointIds: Boolean = false,
+    partitionId: Int = 0) extends Logging {
 
   import RocksDB._
 
@@ -991,7 +991,7 @@ class RocksDB(
    * - Create a RocksDB checkpoint in a new local dir
    * - Sync the checkpoint dir files to DFS
    */
-  def commit(): Long = {
+  def commit(): (Long, StateStoreCheckpointInfo) = {
     val newVersion = loadedVersion + 1
     try {
       logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, 
newVersion)}")
@@ -1060,7 +1060,7 @@ class RocksDB(
       recordedMetrics = Some(metrics)
       logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " +
         log"stats = ${MDC(LogKeys.METRICS_JSON, recordedMetrics.get.json)}")
-      loadedVersion
+      (loadedVersion, getLatestCheckpointInfo)
     } catch {
       case t: Throwable =>
         loadedVersion = -1  // invalidate loaded version
@@ -1228,11 +1228,11 @@ class RocksDB(
   def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = 
(writeBufferManager, lruCache)
 
   /**
-   * Called by RocksDBStateStoreProvider to retrieve the checkpoint 
information to be
+   * Called by commit() to retrieve the checkpoint information to be
    * passed back to the stateful operator. It will return the information for 
the latest
    * state store checkpointing.
    */
-  def getLatestCheckpointInfo(partitionId: Int): StateStoreCheckpointInfo = {
+  private def getLatestCheckpointInfo: StateStoreCheckpointInfo = {
     StateStoreCheckpointInfo(
       partitionId,
       loadedVersion,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index ee11ae6cfae8..15794cada675 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -230,10 +230,12 @@ private[sql] class RocksDBStateStoreProvider
       }
     }
 
+    var checkpointInfo: Option[StateStoreCheckpointInfo] = None
     override def commit(): Long = synchronized {
       try {
         verify(state == UPDATING, "Cannot commit after already committed or 
aborted")
-        val newVersion = rocksDB.commit()
+        val (newVersion, newCheckpointInfo) = rocksDB.commit()
+        checkpointInfo = Some(newCheckpointInfo)
         state = COMMITTED
         logInfo(log"Committed ${MDC(VERSION_NUM, newVersion)} " +
           log"for ${MDC(STATE_STORE_ID, id)}")
@@ -335,8 +337,11 @@ private[sql] class RocksDBStateStoreProvider
     }
 
     override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = {
-      val checkpointInfo = rocksDB.getLatestCheckpointInfo(id.partitionId)
-      checkpointInfo
+      checkpointInfo match {
+        case Some(info) => info
+        case None => throw StateStoreErrors.stateStoreOperationOutOfOrder(
+          "Cannot get checkpointInfo without committing the store")
+      }
     }
 
     override def hasCommitted: Boolean = state == COMMITTED
@@ -526,7 +531,7 @@ private[sql] class RocksDBStateStoreProvider
     val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
     val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), 
storeIdStr)
     new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, 
storeIdStr,
-      useColumnFamilies, storeConf.enableStateStoreCheckpointIds)
+      useColumnFamilies, storeConf.enableStateStoreCheckpointIds, 
stateStoreId.partitionId)
   }
 
   private val keyValueEncoderMap = new 
java.util.concurrent.ConcurrentHashMap[String,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 5b6b5764099e..b3bfce752fcf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -217,6 +217,10 @@ object StateStoreErrors {
     StateStoreFailedToGetChangelogWriter = {
     new StateStoreFailedToGetChangelogWriter(version, e)
   }
+
+  def stateStoreOperationOutOfOrder(errorMsg: String): 
StateStoreOperationOutOfOrder = {
+    new StateStoreOperationOutOfOrder(errorMsg)
+  }
 }
 
 class StateStoreDuplicateStateVariableDefined(stateVarName: String)
@@ -435,3 +439,9 @@ class 
StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String)
   extends SparkUnsupportedOperationException(
     errorClass = 
"STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
     messageParameters = Map("inputClass" -> inputClass))
+
+class StateStoreOperationOutOfOrder(errorMsg: String)
+  extends SparkRuntimeException(
+    errorClass = "STATE_STORE_OPERATION_OUT_OF_ORDER",
+    messageParameters = Map("errorMsg" -> errorMsg)
+  )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
index da4f685aaff8..ffbeaead9512 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.sql.{DataFrame, ForeachWriter}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream}
+import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
@@ -1112,4 +1113,32 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
       )
     }
   }
+
+  test("checkpointFormatVersion2 racing commits don't return incorrect 
checkpointInfo") {
+    val sqlConf = new SQLConf()
+    sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, 2)
+
+    withTempDir { checkpointDir =>
+      val provider = new CkptIdCollectingStateStoreProviderWrapper()
+      provider.init(
+        StateStoreId(checkpointDir.toString, 0, 0),
+        StateStoreTestsHelper.keySchema,
+        StateStoreTestsHelper.valueSchema,
+        PrefixKeyScanStateEncoderSpec(StateStoreTestsHelper.keySchema, 1),
+        useColumnFamilies = false,
+        new StateStoreConf(sqlConf),
+        new Configuration
+      )
+
+      val store1 = provider.getStore(0)
+      val store1NewVersion = store1.commit()
+      val store2 = provider.getStore(1)
+      val store2NewVersion = store2.commit()
+      val store1CheckpointInfo = store1.getStateStoreCheckpointInfo()
+      val store2CheckpointInfo = store2.getStateStoreCheckpointInfo()
+
+      assert(store1CheckpointInfo.batchVersion == store1NewVersion)
+      assert(store2CheckpointInfo.batchVersion == store2NewVersion)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 475f8b116830..9a79f3fa0ae8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -2656,7 +2656,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.put("a", "5")
         db.put("b", "5")
 
-        curVersion = db.commit()
+        curVersion = db.commit()._1
 
         assert(db.metricsOpt.get.numUncommittedKeys === 2)
         assert(db.metricsOpt.get.numCommittedKeys === 2)
@@ -2672,7 +2672,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.put("b", "7")
         db.put("c", "7")
 
-        curVersion = db.commit()
+        curVersion = db.commit()._1
 
         assert(db.metricsOpt.get.numUncommittedKeys === -1)
         assert(db.metricsOpt.get.numCommittedKeys === -1)
@@ -2688,7 +2688,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.put("c", "8")
         db.put("d", "8")
 
-        curVersion = db.commit()
+        curVersion = db.commit()._1
 
         assert(db.metricsOpt.get.numUncommittedKeys === 4)
         assert(db.metricsOpt.get.numCommittedKeys === 4)
@@ -3523,7 +3523,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       }
     }
 
-    override def commit(): Long = {
+    override def commit(): (Long, StateStoreCheckpointInfo) = {
       val ret = super.commit()
       // update versionToUniqueId from lineageManager
       lineageManager.getLineageForCurrVersion().foreach {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to