This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 285489b02250 [SPARK-48957][SS] Return sub-classified error class on 
state store load for hdfs and rocksdb provider
285489b02250 is described below

commit 285489b0225004e918b6e937f7367e492292815e
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Jul 23 17:18:52 2024 +0900

    [SPARK-48957][SS] Return sub-classified error class on state store load for 
hdfs and rocksdb provider
    
    ### What changes were proposed in this pull request?
    Return sub-classified error class on state store load for hdfs and rocksdb 
provider
    
    ### Why are the changes needed?
    Without the change, all the higher level functions were seeing the 
exception and error class as `CANNOT_LOAD_STATE_STORE.UNCATEGORIZED`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Augmented unit tests
    
    ```
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.RocksDBSuite, threads: 
ForkJoinPool.commonPool-worker-6 (daemon=true), 
ForkJoinPool.commonPool-worker-4 (daemon=true), 
ForkJoinPool.commonPool-worker-7 (daemon=true), 
ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), 
ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), 
ForkJoinPool.commonPool-worker-1 (daemon=true) [...]
    [info] Run completed in 4 minutes, 12 seconds.
    [info] Total number of tests run: 176
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47431 from anishshri-db/task/SPARK-48957.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../state/HDFSBackedStateStoreProvider.scala       |  4 +-
 .../state/RocksDBStateStoreProvider.scala          |  6 ++-
 .../execution/streaming/state/RocksDBSuite.scala   |  8 ++--
 .../streaming/state/StateStoreSuite.scala          | 52 ++++++++++++++--------
 4 files changed, 46 insertions(+), 24 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2ec36166f9f2..7affdad79632 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException}
 import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -282,6 +282,8 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       newMap
     }
     catch {
+      case e: SparkException if 
e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
+        throw e
       case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index a5a8d27116ce..cf582090b5d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException}
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.io.CompressionCodec
@@ -374,6 +374,8 @@ private[sql] class RocksDBStateStoreProvider
       new RocksDBStateStore(version)
     }
     catch {
+      case e: SparkException if 
e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
+        throw e
       case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
     }
   }
@@ -387,6 +389,8 @@ private[sql] class RocksDBStateStoreProvider
       new RocksDBStateStore(version)
     }
     catch {
+      case e: SparkException if 
e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
+        throw e
       case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 90331b8a098f..b09e562d566b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -201,16 +201,16 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
     checkError(
       ex,
-      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
-      parameters = Map.empty
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+      parameters = Map("version" -> "-1")
     )
     ex = intercept[SparkException] {
       provider.getReadStore(-1)
     }
     checkError(
       ex,
-      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
-      parameters = Map.empty
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+      parameters = Map("version" -> "-1")
     )
 
     val remoteDir = Utils.createTempDir().toString
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 1e7bffe7ca4d..46b971f7efe4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -363,7 +363,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
         getData(provider, snapshotVersion - 1)
       }
       checkError(
-        e.getCause.asInstanceOf[SparkThrowable],
+        e,
         errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
         parameters = Map(
           "fileToRead" -> 
s"${provider.stateStoreId.storeCheckpointLocation()}/1.delta",
@@ -1089,14 +1089,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
       var e = intercept[SparkException] {
         provider.getStore(2)
       }
-      assert(e.getCause.isInstanceOf[SparkException])
-      assert(e.getCause.getMessage.contains("does not exist"))
+      assert(e.getMessage.contains("does not exist"))
 
       e = intercept[SparkException] {
         getData(provider, 2, useColumnFamilies = colFamiliesEnabled)
       }
-      assert(e.getCause.isInstanceOf[SparkException])
-      assert(e.getCause.getMessage.contains("does not exist"))
+      assert(e.getMessage.contains("does not exist"))
 
       // New updates to the reloaded store with new version, and does not 
change old version
       tryWithProviderResource(newStoreProvider(store.id, colFamiliesEnabled)) 
{ reloadedProvider =>
@@ -1236,19 +1234,37 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
 
   testWithAllCodec(s"getStore with invalid versions") { colFamiliesEnabled =>
     tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
-      def checkInvalidVersion(version: Int): Unit = {
+      def checkInvalidVersion(version: Int, isHDFSBackedStoreProvider: 
Boolean): Unit = {
         val e = intercept[SparkException] {
           provider.getStore(version)
         }
-        checkError(
-          e,
-          errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
-          parameters = Map.empty
-        )
+        if (version < 0) {
+          checkError(
+            e,
+            errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+            parameters = Map("version" -> version.toString)
+          )
+        } else {
+          if (isHDFSBackedStoreProvider) {
+            checkError(
+              e,
+              errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+              parameters = Map("fileToRead" -> ".*", "clazz" -> ".*"),
+              matchPVals = true
+            )
+          } else {
+            checkError(
+              e,
+              errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+              parameters = Map("fileToRead" -> ".*"),
+              matchPVals = true
+            )
+          }
+        }
       }
 
-      checkInvalidVersion(-1)
-      checkInvalidVersion(1)
+      checkInvalidVersion(-1, 
provider.isInstanceOf[HDFSBackedStateStoreProvider])
+      checkInvalidVersion(1, 
provider.isInstanceOf[HDFSBackedStateStoreProvider])
 
       val store = provider.getStore(0)
       put(store, "a", 0, 1)
@@ -1258,8 +1274,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
       val store1_ = provider.getStore(1)
       assert(rowPairsToDataSet(store1_.iterator()) === Set(("a", 0) -> 1))
 
-      checkInvalidVersion(-1)
-      checkInvalidVersion(2)
+      checkInvalidVersion(-1, 
provider.isInstanceOf[HDFSBackedStateStoreProvider])
+      checkInvalidVersion(2, 
provider.isInstanceOf[HDFSBackedStateStoreProvider])
 
       // Update store version with some data
       val store1 = provider.getStore(1)
@@ -1268,8 +1284,8 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
       assert(store1.commit() === 2)
       assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1, ("b", 
0) -> 1))
 
-      checkInvalidVersion(-1)
-      checkInvalidVersion(3)
+      checkInvalidVersion(-1, 
provider.isInstanceOf[HDFSBackedStateStoreProvider])
+      checkInvalidVersion(3, 
provider.isInstanceOf[HDFSBackedStateStoreProvider])
     }
   }
 
@@ -1444,7 +1460,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
               storeConf, hadoopConf)
           }
           checkError(
-            e.getCause.asInstanceOf[SparkThrowable],
+            e,
             errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
             parameters = Map(
               "fileToRead" -> s"$dir/0/0/1.delta",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to