This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4765c15a442e [SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race condition 4765c15a442e is described below commit 4765c15a442ee92022b28398ba52136b8e05082d Author: Livia Zhu <livia....@databricks.com> AuthorDate: Thu Mar 27 12:24:39 2025 +0900 [SPARK-51573][SS] Fix Streaming State Checkpoint v2 checkpointInfo race condition ### What changes were proposed in this pull request? Return StateStoreCheckpointInfo as part of RocksDB.commit() and store it locally in the RocksDBStateStore so that RocksDBStateStore.getCheckpointInfo() always returns the checkpoint info belonging to its commit. ### Why are the changes needed? Fixes the bug explained in SPARK-51573. This race condition will result in tasks getting incorrect checkpointInfo which is a correctness bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50344 from liviazhu-db/liviazhu-db/checkpointinfo-race. Authored-by: Livia Zhu <livia....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-conditions.json | 6 +++++ .../sql/execution/streaming/state/RocksDB.scala | 14 +++++------ .../state/RocksDBStateStoreProvider.scala | 13 +++++++--- .../streaming/state/StateStoreErrors.scala | 10 ++++++++ .../RocksDBStateStoreCheckpointFormatV2Suite.scala | 29 ++++++++++++++++++++++ .../execution/streaming/state/RocksDBSuite.scala | 8 +++--- 6 files changed, 65 insertions(+), 15 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 6417ed22344e..417170d4b744 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4859,6 +4859,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_OPERATION_OUT_OF_ORDER" : { + "message" : [ + "Streaming stateful operator attempted to access state store out of order. This is a bug, please retry. error_msg=<errorMsg>" + ], + "sqlState" : "XXKST" + }, "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : { "message" : [ "The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplay.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index aa8e96bac046..d4bf9d31617b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -60,11 +60,10 @@ case object StoreTaskCompletionListener extends RocksDBOpType("store_task_comple * * @note This class is not thread-safe, so use it only from one thread. * @see [[RocksDBFileManager]] to see how the files are laid out in local disk and DFS. - * @param dfsRootDir Remote directory where checkpoints are going to be written * @param conf Configuration for RocksDB + * @param stateStoreId StateStoreId for the state store * @param localRootDir Root directory in local disk that is used to working and checkpointing dirs * @param hadoopConf Hadoop configuration for talking to the remote file system - * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs */ class RocksDB( dfsRootDir: String, @@ -73,7 +72,8 @@ class RocksDB( hadoopConf: Configuration = new Configuration, loggingId: String = "", useColumnFamilies: Boolean = false, - enableStateStoreCheckpointIds: Boolean = false) extends Logging { + enableStateStoreCheckpointIds: Boolean = false, + partitionId: Int = 0) extends Logging { import RocksDB._ @@ -991,7 +991,7 @@ class RocksDB( * - Create a RocksDB checkpoint in a new local dir * - Sync the checkpoint dir files to DFS */ - def commit(): Long = { + def commit(): (Long, StateStoreCheckpointInfo) = { val newVersion = loadedVersion + 1 try { logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}") @@ -1060,7 +1060,7 @@ class RocksDB( recordedMetrics = Some(metrics) logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " + log"stats = ${MDC(LogKeys.METRICS_JSON, recordedMetrics.get.json)}") - loadedVersion + (loadedVersion, getLatestCheckpointInfo) } catch { case t: Throwable => loadedVersion = -1 // invalidate loaded version @@ -1228,11 +1228,11 @@ class RocksDB( def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = (writeBufferManager, lruCache) /** - * Called by RocksDBStateStoreProvider to retrieve the checkpoint information to be + * Called by commit() to retrieve the checkpoint information to be * passed back to the stateful operator. It will return the information for the latest * state store checkpointing. */ - def getLatestCheckpointInfo(partitionId: Int): StateStoreCheckpointInfo = { + private def getLatestCheckpointInfo: StateStoreCheckpointInfo = { StateStoreCheckpointInfo( partitionId, loadedVersion, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index ee11ae6cfae8..15794cada675 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -230,10 +230,12 @@ private[sql] class RocksDBStateStoreProvider } } + var checkpointInfo: Option[StateStoreCheckpointInfo] = None override def commit(): Long = synchronized { try { verify(state == UPDATING, "Cannot commit after already committed or aborted") - val newVersion = rocksDB.commit() + val (newVersion, newCheckpointInfo) = rocksDB.commit() + checkpointInfo = Some(newCheckpointInfo) state = COMMITTED logInfo(log"Committed ${MDC(VERSION_NUM, newVersion)} " + log"for ${MDC(STATE_STORE_ID, id)}") @@ -335,8 +337,11 @@ private[sql] class RocksDBStateStoreProvider } override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = { - val checkpointInfo = rocksDB.getLatestCheckpointInfo(id.partitionId) - checkpointInfo + checkpointInfo match { + case Some(info) => info + case None => throw StateStoreErrors.stateStoreOperationOutOfOrder( + "Cannot get checkpointInfo without committing the store") + } } override def hasCommitted: Boolean = state == COMMITTED @@ -526,7 +531,7 @@ private[sql] class RocksDBStateStoreProvider val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr) new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, storeIdStr, - useColumnFamilies, storeConf.enableStateStoreCheckpointIds) + useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId) } private val keyValueEncoderMap = new java.util.concurrent.ConcurrentHashMap[String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 5b6b5764099e..b3bfce752fcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -217,6 +217,10 @@ object StateStoreErrors { StateStoreFailedToGetChangelogWriter = { new StateStoreFailedToGetChangelogWriter(version, e) } + + def stateStoreOperationOutOfOrder(errorMsg: String): StateStoreOperationOutOfOrder = { + new StateStoreOperationOutOfOrder(errorMsg) + } } class StateStoreDuplicateStateVariableDefined(stateVarName: String) @@ -435,3 +439,9 @@ class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) extends SparkUnsupportedOperationException( errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY", messageParameters = Map("inputClass" -> inputClass)) + +class StateStoreOperationOutOfOrder(errorMsg: String) + extends SparkRuntimeException( + errorClass = "STATE_STORE_OPERATION_OUT_OF_ORDER", + messageParameters = Map("errorMsg" -> errorMsg) + ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index da4f685aaff8..ffbeaead9512 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -27,6 +27,7 @@ import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.sql.{DataFrame, ForeachWriter} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream} +import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper import org.apache.spark.sql.functions.count import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ @@ -1112,4 +1113,32 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest ) } } + + test("checkpointFormatVersion2 racing commits don't return incorrect checkpointInfo") { + val sqlConf = new SQLConf() + sqlConf.setConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, 2) + + withTempDir { checkpointDir => + val provider = new CkptIdCollectingStateStoreProviderWrapper() + provider.init( + StateStoreId(checkpointDir.toString, 0, 0), + StateStoreTestsHelper.keySchema, + StateStoreTestsHelper.valueSchema, + PrefixKeyScanStateEncoderSpec(StateStoreTestsHelper.keySchema, 1), + useColumnFamilies = false, + new StateStoreConf(sqlConf), + new Configuration + ) + + val store1 = provider.getStore(0) + val store1NewVersion = store1.commit() + val store2 = provider.getStore(1) + val store2NewVersion = store2.commit() + val store1CheckpointInfo = store1.getStateStoreCheckpointInfo() + val store2CheckpointInfo = store2.getStateStoreCheckpointInfo() + + assert(store1CheckpointInfo.batchVersion == store1NewVersion) + assert(store2CheckpointInfo.batchVersion == store2NewVersion) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 475f8b116830..9a79f3fa0ae8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2656,7 +2656,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("a", "5") db.put("b", "5") - curVersion = db.commit() + curVersion = db.commit()._1 assert(db.metricsOpt.get.numUncommittedKeys === 2) assert(db.metricsOpt.get.numCommittedKeys === 2) @@ -2672,7 +2672,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("b", "7") db.put("c", "7") - curVersion = db.commit() + curVersion = db.commit()._1 assert(db.metricsOpt.get.numUncommittedKeys === -1) assert(db.metricsOpt.get.numCommittedKeys === -1) @@ -2688,7 +2688,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("c", "8") db.put("d", "8") - curVersion = db.commit() + curVersion = db.commit()._1 assert(db.metricsOpt.get.numUncommittedKeys === 4) assert(db.metricsOpt.get.numCommittedKeys === 4) @@ -3523,7 +3523,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } - override def commit(): Long = { + override def commit(): (Long, StateStoreCheckpointInfo) = { val ret = super.commit() // update versionToUniqueId from lineageManager lineageManager.getLineageForCurrVersion().foreach { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org