This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 285489b02250 [SPARK-48957][SS] Return sub-classified error class on
state store load for hdfs and rocksdb provider
285489b02250 is described below
commit 285489b0225004e918b6e937f7367e492292815e
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Jul 23 17:18:52 2024 +0900
[SPARK-48957][SS] Return sub-classified error class on state store load for
hdfs and rocksdb provider
### What changes were proposed in this pull request?
Return sub-classified error class on state store load for hdfs and rocksdb
provider
### Why are the changes needed?
Without the change, all the higher level functions were seeing the
exception and error class as `CANNOT_LOAD_STATE_STORE.UNCATEGORIZED`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Augmented unit tests
```
===== POSSIBLE THREAD LEAK IN SUITE
o.a.s.sql.execution.streaming.state.RocksDBSuite, threads:
ForkJoinPool.commonPool-worker-6 (daemon=true),
ForkJoinPool.commonPool-worker-4 (daemon=true),
ForkJoinPool.commonPool-worker-7 (daemon=true),
ForkJoinPool.commonPool-worker-5 (daemon=true),
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true),
ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true),
ForkJoinPool.commonPool-worker-1 (daemon=true) [...]
[info] Run completed in 4 minutes, 12 seconds.
[info] Total number of tests run: 176
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47431 from anishshri-db/task/SPARK-48957.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../state/HDFSBackedStateStoreProvider.scala | 4 +-
.../state/RocksDBStateStoreProvider.scala | 6 ++-
.../execution/streaming/state/RocksDBSuite.scala | 8 ++--
.../streaming/state/StateStoreSuite.scala | 52 ++++++++++++++--------
4 files changed, 46 insertions(+), 24 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2ec36166f9f2..7affdad79632 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -282,6 +282,8 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
newMap
}
catch {
+ case e: SparkException if
e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
+ throw e
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index a5a8d27116ce..cf582090b5d4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
@@ -374,6 +374,8 @@ private[sql] class RocksDBStateStoreProvider
new RocksDBStateStore(version)
}
catch {
+ case e: SparkException if
e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
+ throw e
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
@@ -387,6 +389,8 @@ private[sql] class RocksDBStateStoreProvider
new RocksDBStateStore(version)
}
catch {
+ case e: SparkException if
e.getErrorClass.contains("CANNOT_LOAD_STATE_STORE") =>
+ throw e
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 90331b8a098f..b09e562d566b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -201,16 +201,16 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
checkError(
ex,
- errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
- parameters = Map.empty
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+ parameters = Map("version" -> "-1")
)
ex = intercept[SparkException] {
provider.getReadStore(-1)
}
checkError(
ex,
- errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
- parameters = Map.empty
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+ parameters = Map("version" -> "-1")
)
val remoteDir = Utils.createTempDir().toString
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 1e7bffe7ca4d..46b971f7efe4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -363,7 +363,7 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
getData(provider, snapshotVersion - 1)
}
checkError(
- e.getCause.asInstanceOf[SparkThrowable],
+ e,
errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
parameters = Map(
"fileToRead" ->
s"${provider.stateStoreId.storeCheckpointLocation()}/1.delta",
@@ -1089,14 +1089,12 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
var e = intercept[SparkException] {
provider.getStore(2)
}
- assert(e.getCause.isInstanceOf[SparkException])
- assert(e.getCause.getMessage.contains("does not exist"))
+ assert(e.getMessage.contains("does not exist"))
e = intercept[SparkException] {
getData(provider, 2, useColumnFamilies = colFamiliesEnabled)
}
- assert(e.getCause.isInstanceOf[SparkException])
- assert(e.getCause.getMessage.contains("does not exist"))
+ assert(e.getMessage.contains("does not exist"))
// New updates to the reloaded store with new version, and does not
change old version
tryWithProviderResource(newStoreProvider(store.id, colFamiliesEnabled))
{ reloadedProvider =>
@@ -1236,19 +1234,37 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
testWithAllCodec(s"getStore with invalid versions") { colFamiliesEnabled =>
tryWithProviderResource(newStoreProvider(colFamiliesEnabled)) { provider =>
- def checkInvalidVersion(version: Int): Unit = {
+ def checkInvalidVersion(version: Int, isHDFSBackedStoreProvider:
Boolean): Unit = {
val e = intercept[SparkException] {
provider.getStore(version)
}
- checkError(
- e,
- errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
- parameters = Map.empty
- )
+ if (version < 0) {
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION",
+ parameters = Map("version" -> version.toString)
+ )
+ } else {
+ if (isHDFSBackedStoreProvider) {
+ checkError(
+ e,
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
+ parameters = Map("fileToRead" -> ".*", "clazz" -> ".*"),
+ matchPVals = true
+ )
+ } else {
+ checkError(
+ e,
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+ parameters = Map("fileToRead" -> ".*"),
+ matchPVals = true
+ )
+ }
+ }
}
- checkInvalidVersion(-1)
- checkInvalidVersion(1)
+ checkInvalidVersion(-1,
provider.isInstanceOf[HDFSBackedStateStoreProvider])
+ checkInvalidVersion(1,
provider.isInstanceOf[HDFSBackedStateStoreProvider])
val store = provider.getStore(0)
put(store, "a", 0, 1)
@@ -1258,8 +1274,8 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
val store1_ = provider.getStore(1)
assert(rowPairsToDataSet(store1_.iterator()) === Set(("a", 0) -> 1))
- checkInvalidVersion(-1)
- checkInvalidVersion(2)
+ checkInvalidVersion(-1,
provider.isInstanceOf[HDFSBackedStateStoreProvider])
+ checkInvalidVersion(2,
provider.isInstanceOf[HDFSBackedStateStoreProvider])
// Update store version with some data
val store1 = provider.getStore(1)
@@ -1268,8 +1284,8 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
assert(store1.commit() === 2)
assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1, ("b",
0) -> 1))
- checkInvalidVersion(-1)
- checkInvalidVersion(3)
+ checkInvalidVersion(-1,
provider.isInstanceOf[HDFSBackedStateStoreProvider])
+ checkInvalidVersion(3,
provider.isInstanceOf[HDFSBackedStateStoreProvider])
}
}
@@ -1444,7 +1460,7 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
storeConf, hadoopConf)
}
checkError(
- e.getCause.asInstanceOf[SparkThrowable],
+ e,
errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS",
parameters = Map(
"fileToRead" -> s"$dir/0/0/1.delta",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]