This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0deb4cb09ff7 [SPARK-51409][SS] Add error classification in the 
changelog writer creation path
0deb4cb09ff7 is described below

commit 0deb4cb09ff768a75f81cd3b4007485b1c10a327
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Tue Mar 11 15:36:48 2025 +0900

    [SPARK-51409][SS] Add error classification in the changelog writer creation 
path
    
    ### What changes were proposed in this pull request?
    Add error classification in the changelog writer creation path
    
    ### Why are the changes needed?
    We have seen some transient errors thrown around file creation, running out 
of disk space etc which don't get classified with the right sub-classification. 
This change should fix that issue in this path.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ```
    [info] Run completed in 8 minutes, 51 seconds.
    [info] Total number of tests run: 306
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 306, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50176 from anishshri-db/task/SPARK-51409.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-conditions.json |  5 ++
 .../streaming/state/RocksDBFileManager.scala       | 74 ++++++++++++----------
 .../streaming/state/StateStoreErrors.scala         | 11 ++++
 3 files changed, 58 insertions(+), 32 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index e88c8aa68173..2ae54ef5f305 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -304,6 +304,11 @@
           "Error reading streaming state file of <fileToRead> does not exist. 
If the stream job is restarted with a new or updated state operation, please 
create a new checkpoint location or clear the existing checkpoint location."
         ]
       },
+      "FAILED_TO_GET_CHANGELOG_WRITER" : {
+        "message" : [
+          "Failed to get the changelog writer for state store at version 
<version>."
+        ]
+      },
       "HDFS_STORE_PROVIDER_OUT_OF_MEMORY" : {
         "message" : [
           "Could not load HDFS state store with id <stateStoreId> because of 
an out of memory exception."
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index bb1198dfccaf..c586d0c845ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException}
 import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -178,40 +178,50 @@ class RocksDBFileManager(
       checkpointUniqueId: Option[String] = None,
       stateStoreCheckpointIdLineage: Option[Array[LineageItem]] = None
     ): StateStoreChangelogWriter = {
-    val changelogFile = dfsChangelogFile(version, checkpointUniqueId)
-    if (!rootDirChecked) {
-      val rootDir = new Path(dfsRootDir)
-      if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
-      rootDirChecked = true
-    }
+    try {
+      val changelogFile = dfsChangelogFile(version, checkpointUniqueId)
+      if (!rootDirChecked) {
+        val rootDir = new Path(dfsRootDir)
+        if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
+        rootDirChecked = true
+      }
 
-    val enableStateStoreCheckpointIds = checkpointUniqueId.isDefined
-    val changelogVersion = getChangelogWriterVersion(
-      useColumnFamilies, enableStateStoreCheckpointIds)
-
-    val changelogWriter = changelogVersion match {
-      case 1 =>
-        new StateStoreChangelogWriterV1(fm, changelogFile, codec)
-      case 2 =>
-        new StateStoreChangelogWriterV2(fm, changelogFile, codec)
-      case 3 =>
-        assert(enableStateStoreCheckpointIds && 
stateStoreCheckpointIdLineage.isDefined,
-          "StateStoreChangelogWriterV3 should only be initialized when " +
-            "state store checkpoint unique id is enabled")
-        new StateStoreChangelogWriterV3(fm, changelogFile, codec, 
stateStoreCheckpointIdLineage.get)
-      case 4 =>
-        assert(enableStateStoreCheckpointIds && 
stateStoreCheckpointIdLineage.isDefined,
-          "StateStoreChangelogWriterV4 should only be initialized when " +
-            "state store checkpoint unique id is enabled")
-        new StateStoreChangelogWriterV4(fm, changelogFile, codec, 
stateStoreCheckpointIdLineage.get)
-      case _ =>
-        throw 
QueryExecutionErrors.invalidChangeLogWriterVersion(changelogVersion)
-    }
+      val enableStateStoreCheckpointIds = checkpointUniqueId.isDefined
+      val changelogVersion = getChangelogWriterVersion(
+        useColumnFamilies, enableStateStoreCheckpointIds)
+
+      val changelogWriter = changelogVersion match {
+        case 1 =>
+          new StateStoreChangelogWriterV1(fm, changelogFile, codec)
+        case 2 =>
+          new StateStoreChangelogWriterV2(fm, changelogFile, codec)
+        case 3 =>
+          assert(enableStateStoreCheckpointIds && 
stateStoreCheckpointIdLineage.isDefined,
+            "StateStoreChangelogWriterV3 should only be initialized when " +
+              "state store checkpoint unique id is enabled")
+          new StateStoreChangelogWriterV3(fm, changelogFile, codec,
+            stateStoreCheckpointIdLineage.get)
+        case 4 =>
+          assert(enableStateStoreCheckpointIds && 
stateStoreCheckpointIdLineage.isDefined,
+            "StateStoreChangelogWriterV4 should only be initialized when " +
+              "state store checkpoint unique id is enabled")
+          new StateStoreChangelogWriterV4(fm, changelogFile, codec,
+            stateStoreCheckpointIdLineage.get)
+        case _ =>
+          throw 
QueryExecutionErrors.invalidChangeLogWriterVersion(changelogVersion)
+      }
 
-    logInfo(log"Loaded change log reader version " +
-      log"${MDC(LogKeys.FILE_VERSION, changelogWriter.version)}")
+      logInfo(log"Loaded change log reader version " +
+        log"${MDC(LogKeys.FILE_VERSION, changelogWriter.version)}")
 
-    changelogWriter
+      changelogWriter
+    } catch {
+      case e: SparkException
+        if 
Option(e.getCondition).exists(_.contains("CANNOT_LOAD_STATE_STORE")) =>
+          throw e
+      case e: Throwable =>
+        throw StateStoreErrors.failedToGetChangelogWriter(version, e)
+    }
   }
 
   // Get the changelog file at version
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 188306e82f68..5b6b5764099e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -212,6 +212,11 @@ object StateStoreErrors {
     StateStoreInvalidVariableTypeChange = {
     new StateStoreInvalidVariableTypeChange(stateName, oldType, newType)
   }
+
+  def failedToGetChangelogWriter(version: Long, e: Throwable):
+    StateStoreFailedToGetChangelogWriter = {
+    new StateStoreFailedToGetChangelogWriter(version, e)
+  }
 }
 
 class StateStoreDuplicateStateVariableDefined(stateVarName: String)
@@ -410,6 +415,12 @@ class StateStoreSnapshotPartitionNotFound(
       "operatorId" -> operatorId.toString,
       "checkpointLocation" -> checkpointLocation))
 
+class StateStoreFailedToGetChangelogWriter(version: Long, e: Throwable)
+  extends SparkRuntimeException(
+    errorClass = "CANNOT_LOAD_STATE_STORE.FAILED_TO_GET_CHANGELOG_WRITER",
+    messageParameters = Map("version" -> version.toString),
+    cause = e)
+
 class StateStoreKeyRowFormatValidationFailure(errorMsg: String)
   extends SparkRuntimeException(
     errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to