This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0deb4cb09ff7 [SPARK-51409][SS] Add error classification in the changelog writer creation path 0deb4cb09ff7 is described below commit 0deb4cb09ff768a75f81cd3b4007485b1c10a327 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Tue Mar 11 15:36:48 2025 +0900 [SPARK-51409][SS] Add error classification in the changelog writer creation path ### What changes were proposed in this pull request? Add error classification in the changelog writer creation path ### Why are the changes needed? We have seen some transient errors thrown around file creation, running out of disk space etc which don't get classified with the right sub-classification. This change should fix that issue in this path. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` [info] Run completed in 8 minutes, 51 seconds. [info] Total number of tests run: 306 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 306, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #50176 from anishshri-db/task/SPARK-51409. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-conditions.json | 5 ++ .../streaming/state/RocksDBFileManager.scala | 74 ++++++++++++---------- .../streaming/state/StateStoreErrors.scala | 11 ++++ 3 files changed, 58 insertions(+), 32 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e88c8aa68173..2ae54ef5f305 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -304,6 +304,11 @@ "Error reading streaming state file of <fileToRead> does not exist. If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location." ] }, + "FAILED_TO_GET_CHANGELOG_WRITER" : { + "message" : [ + "Failed to get the changelog writer for state store at version <version>." + ] + }, "HDFS_STORE_PROVIDER_OUT_OF_MEMORY" : { "message" : [ "Could not load HDFS state store with id <stateStoreId> because of an out of memory exception." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index bb1198dfccaf..c586d0c845ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv, SparkException} import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.errors.QueryExecutionErrors @@ -178,40 +178,50 @@ class RocksDBFileManager( checkpointUniqueId: Option[String] = None, stateStoreCheckpointIdLineage: Option[Array[LineageItem]] = None ): StateStoreChangelogWriter = { - val changelogFile = dfsChangelogFile(version, checkpointUniqueId) - if (!rootDirChecked) { - val rootDir = new Path(dfsRootDir) - if (!fm.exists(rootDir)) fm.mkdirs(rootDir) - rootDirChecked = true - } + try { + val changelogFile = dfsChangelogFile(version, checkpointUniqueId) + if (!rootDirChecked) { + val rootDir = new Path(dfsRootDir) + if (!fm.exists(rootDir)) fm.mkdirs(rootDir) + rootDirChecked = true + } - val enableStateStoreCheckpointIds = checkpointUniqueId.isDefined - val changelogVersion = getChangelogWriterVersion( - useColumnFamilies, enableStateStoreCheckpointIds) - - val changelogWriter = changelogVersion match { - case 1 => - new StateStoreChangelogWriterV1(fm, changelogFile, codec) - case 2 => - new StateStoreChangelogWriterV2(fm, changelogFile, codec) - case 3 => - assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined, - "StateStoreChangelogWriterV3 should only be initialized when " + - "state store checkpoint unique id is enabled") - new StateStoreChangelogWriterV3(fm, changelogFile, codec, stateStoreCheckpointIdLineage.get) - case 4 => - assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined, - "StateStoreChangelogWriterV4 should only be initialized when " + - "state store checkpoint unique id is enabled") - new StateStoreChangelogWriterV4(fm, changelogFile, codec, stateStoreCheckpointIdLineage.get) - case _ => - throw QueryExecutionErrors.invalidChangeLogWriterVersion(changelogVersion) - } + val enableStateStoreCheckpointIds = checkpointUniqueId.isDefined + val changelogVersion = getChangelogWriterVersion( + useColumnFamilies, enableStateStoreCheckpointIds) + + val changelogWriter = changelogVersion match { + case 1 => + new StateStoreChangelogWriterV1(fm, changelogFile, codec) + case 2 => + new StateStoreChangelogWriterV2(fm, changelogFile, codec) + case 3 => + assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined, + "StateStoreChangelogWriterV3 should only be initialized when " + + "state store checkpoint unique id is enabled") + new StateStoreChangelogWriterV3(fm, changelogFile, codec, + stateStoreCheckpointIdLineage.get) + case 4 => + assert(enableStateStoreCheckpointIds && stateStoreCheckpointIdLineage.isDefined, + "StateStoreChangelogWriterV4 should only be initialized when " + + "state store checkpoint unique id is enabled") + new StateStoreChangelogWriterV4(fm, changelogFile, codec, + stateStoreCheckpointIdLineage.get) + case _ => + throw QueryExecutionErrors.invalidChangeLogWriterVersion(changelogVersion) + } - logInfo(log"Loaded change log reader version " + - log"${MDC(LogKeys.FILE_VERSION, changelogWriter.version)}") + logInfo(log"Loaded change log reader version " + + log"${MDC(LogKeys.FILE_VERSION, changelogWriter.version)}") - changelogWriter + changelogWriter + } catch { + case e: SparkException + if Option(e.getCondition).exists(_.contains("CANNOT_LOAD_STATE_STORE")) => + throw e + case e: Throwable => + throw StateStoreErrors.failedToGetChangelogWriter(version, e) + } } // Get the changelog file at version diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 188306e82f68..5b6b5764099e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -212,6 +212,11 @@ object StateStoreErrors { StateStoreInvalidVariableTypeChange = { new StateStoreInvalidVariableTypeChange(stateName, oldType, newType) } + + def failedToGetChangelogWriter(version: Long, e: Throwable): + StateStoreFailedToGetChangelogWriter = { + new StateStoreFailedToGetChangelogWriter(version, e) + } } class StateStoreDuplicateStateVariableDefined(stateVarName: String) @@ -410,6 +415,12 @@ class StateStoreSnapshotPartitionNotFound( "operatorId" -> operatorId.toString, "checkpointLocation" -> checkpointLocation)) +class StateStoreFailedToGetChangelogWriter(version: Long, e: Throwable) + extends SparkRuntimeException( + errorClass = "CANNOT_LOAD_STATE_STORE.FAILED_TO_GET_CHANGELOG_WRITER", + messageParameters = Map("version" -> version.toString), + cause = e) + class StateStoreKeyRowFormatValidationFailure(errorMsg: String) extends SparkRuntimeException( errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org