This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4531d710ba79 [SPARK-45138][SS] Define a new error class and apply it
when checkpointing state to DFS fails
4531d710ba79 is described below
commit 4531d710ba79216c8c6626650675f83425c06fa4
Author: Neil Ramaswamy <[email protected]>
AuthorDate: Thu Sep 21 11:59:03 2023 +0900
[SPARK-45138][SS] Define a new error class and apply it when checkpointing
state to DFS fails
### What changes were proposed in this pull request?
In this change, we add a new a new error class when checkpointing state to
the DFS, for either state store provider, fails during `commit`.
### Why are the changes needed?
Users might be confused when they see an `IOException`, for example, if a
call to `commit` fails. This is a neat and self-explanatory wrapper around such
exceptions.
### Does this PR introduce _any_ user-facing change?
Yes, the error message when DFS checkpoint fails is now a `SparkException`
with error class.
### How was this patch tested?
- Modified existing UTs
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42895 from neilramaswamy/nr-state-files-commit-logging.
Authored-by: Neil Ramaswamy <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-classes.json | 12 +++++
...ditions-cannot-write-state-store-error-class.md | 32 +++++++++++++
docs/sql-error-conditions.md | 8 ++++
.../spark/sql/errors/QueryExecutionErrors.scala | 7 +++
.../state/HDFSBackedStateStoreProvider.scala | 8 ++--
.../state/RocksDBStateStoreProvider.scala | 15 ++++--
.../streaming/state/RocksDBStateStoreSuite.scala | 9 +++-
.../streaming/state/StateStoreSuite.scala | 54 +++++++++++++++-------
8 files changed, 117 insertions(+), 28 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 186e7b4640d8..d92ccfce5c52 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -314,6 +314,18 @@
"<details>"
]
},
+ "CANNOT_WRITE_STATE_STORE" : {
+ "message" : [
+ "Error writing state store files for provider <providerClass>."
+ ],
+ "subClass" : {
+ "CANNOT_COMMIT" : {
+ "message" : [
+ "Cannot perform commit during state checkpoint."
+ ]
+ }
+ }
+ },
"CAST_INVALID_INPUT" : {
"message" : [
"The value <expression> of the type <sourceType> cannot be cast to
<targetType> because it is malformed. Correct the value as per the syntax, or
change its target type. Use `try_cast` to tolerate malformed input and return
NULL instead. If necessary set <ansiConfig> to \"false\" to bypass this error."
diff --git a/docs/sql-error-conditions-cannot-write-state-store-error-class.md
b/docs/sql-error-conditions-cannot-write-state-store-error-class.md
new file mode 100644
index 000000000000..ab7b852f892b
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-write-state-store-error-class.md
@@ -0,0 +1,32 @@
+---
+layout: global
+title: CANNOT_WRITE_STATE_STORE error class
+displayTitle: CANNOT_WRITE_STATE_STORE error class
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+SQLSTATE: none assigned
+
+Error writing state store files for provider `<providerClass>`.
+
+This error class has the following derived error classes:
+
+## CANNOT_COMMIT
+
+Cannot perform commit during state checkpoint.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 4f982e52bc86..1df00f72bc97 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -271,6 +271,14 @@ SQLSTATE: none assigned
Cannot up cast `<expression>` from `<sourceType>` to `<targetType>`.
`<details>`
+###
[CANNOT_WRITE_STATE_STORE](sql-error-conditions-cannot-write-state-store-error-class.html)
+
+SQLSTATE: none assigned
+
+Error writing state store files for provider `<providerClass>`.
+
+For more details see
[CANNOT_WRITE_STATE_STORE](sql-error-conditions-cannot-write-state-store-error-class.html)
+
### CAST_INVALID_INPUT
[SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 643cbc3cbdb9..e14fef1fad72 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2249,6 +2249,13 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
cause = f)
}
+ def failedToCommitStateFileError(providerClass: String, f: Throwable):
Throwable = {
+ new SparkException(
+ errorClass = "CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT",
+ messageParameters = Map("providerClass" -> providerClass),
+ cause = f)
+ }
+
def cannotPurgeAsBreakInternalStateError():
SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
errorClass = "_LEGACY_ERROR_TEMP_2260",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index afa1fdaa2237..66832400aa14 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -135,17 +135,15 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
/** Commit all the updates that have been made to the store, and return
the new version. */
override def commit(): Long = {
- verify(state == UPDATING, "Cannot commit after already committed or
aborted")
-
try {
+ verify(state == UPDATING, "Cannot commit after already committed or
aborted")
commitUpdates(newVersion, mapToUpdate, compressedStream)
state = COMMITTED
logInfo(s"Committed version $newVersion for $this to file
$finalDeltaFile")
newVersion
} catch {
- case NonFatal(e) =>
- throw new IllegalStateException(
- s"Error committing version $newVersion into $this", e)
+ case e: Throwable =>
+ throw
QueryExecutionErrors.failedToCommitStateFileError(this.toString(), e)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 37a8785f04d6..4254640201c5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -90,11 +90,16 @@ private[sql] class RocksDBStateStoreProvider
}
override def commit(): Long = synchronized {
- verify(state == UPDATING, "Cannot commit after already committed or
aborted")
- val newVersion = rocksDB.commit()
- state = COMMITTED
- logInfo(s"Committed $newVersion for $id")
- newVersion
+ try {
+ verify(state == UPDATING, "Cannot commit after already committed or
aborted")
+ val newVersion = rocksDB.commit()
+ state = COMMITTED
+ logInfo(s"Committed $newVersion for $id")
+ newVersion
+ } catch {
+ case e: Throwable =>
+ throw
QueryExecutionErrors.failedToCommitStateFileError(this.toString(), e)
+ }
}
override def abort(): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index d113085fd1c4..d1cc7e0b3b9c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -152,6 +152,10 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
newStoreProvider(storeId, numColsPrefixKey = 0)
}
+ def newStoreProvider(storeId: StateStoreId, conf: Configuration):
RocksDBStateStoreProvider = {
+ newStoreProvider(storeId, numColsPrefixKey = -1, conf = conf)
+ }
+
override def newStoreProvider(numPrefixCols: Int): RocksDBStateStoreProvider
= {
newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0),
numColsPrefixKey = numPrefixCols)
}
@@ -159,11 +163,12 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
def newStoreProvider(
storeId: StateStoreId,
numColsPrefixKey: Int,
- sqlConf: Option[SQLConf] = None): RocksDBStateStoreProvider = {
+ sqlConf: Option[SQLConf] = None,
+ conf: Configuration = new Configuration): RocksDBStateStoreProvider = {
val provider = new RocksDBStateStoreProvider()
provider.init(
storeId, keySchema, valueSchema, numColsPrefixKey = numColsPrefixKey,
- new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), new Configuration)
+ new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
provider
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 093f42620112..e6d2f63267fd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -534,21 +534,6 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
- testQuietly("SPARK-18342: commit fails when rename fails") {
- import RenameReturnsFalseFileSystem._
- val dir = scheme + "://" + newDir()
- val conf = new Configuration()
- conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
- tryWithProviderResource(newStoreProvider(
- opId = Random.nextInt, partition = 0, dir = dir, hadoopConf = conf)) {
provider =>
-
- val store = provider.getStore(0)
- put(store, "a", 0, 0)
- val e = intercept[IllegalStateException](store.commit())
- assert(e.getCause.getMessage.contains("Failed to rename"))
- }
- }
-
test("SPARK-18416: do not create temp delta file until the store is
updated") {
val dir = newDir()
val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0),
UUID.randomUUID)
@@ -692,8 +677,9 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
// Fail commit for next version and verify that reloading resets the
files
CreateAtomicTestManager.shouldFailInCreateAtomic = true
put(store, "11", 0, 11)
- val e = intercept[IllegalStateException] { quietly { store.commit() } }
+ val e = intercept[SparkException] { quietly { store.commit() } }
assert(e.getCause.isInstanceOf[IOException])
+ assert(e.getMessage.contains("Cannot perform commit"))
CreateAtomicTestManager.shouldFailInCreateAtomic = false
// Abort commit for next version and verify that reloading resets the
files
@@ -799,6 +785,14 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
newStoreProvider(storeId.operatorId, storeId.partitionId, dir =
storeId.checkpointRootLocation)
}
+ def newStoreProvider(storeId: StateStoreId, conf: Configuration):
HDFSBackedStateStoreProvider = {
+ newStoreProvider(
+ storeId.operatorId,
+ storeId.partitionId,
+ dir = storeId.checkpointRootLocation,
+ hadoopConf = conf)
+ }
+
override def newStoreProvider(
minDeltasForSnapshot: Int,
numOfVersToRetainInMemory: Int): HDFSBackedStateStoreProvider = {
@@ -1152,6 +1146,31 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
}
}
+ testQuietly("SPARK-18342: commit fails when rename fails") {
+ import RenameReturnsFalseFileSystem._
+
+ val ROCKSDB_STATE_STORE = "RocksDBStateStore"
+ val dir = scheme + "://" + newDir()
+ val conf = new Configuration()
+ conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
+
+ val storeId = StateStoreId(dir, operatorId = 0, partitionId = 0)
+ tryWithProviderResource(newStoreProvider(storeId, conf)) { provider =>
+ val store = provider.getStore(0)
+ put(store, "a", 0, 0)
+ val e = intercept[SparkException](quietly { store.commit() } )
+
+ assert(e.getErrorClass == "CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT")
+ if (store.getClass.getName contains ROCKSDB_STATE_STORE) {
+ assert(e.getMessage contains "RocksDBStateStore[id=(op=0,part=0)")
+ } else {
+ assert(e.getMessage contains "HDFSStateStore[id=(op=0,part=0)")
+ }
+ assert(e.getMessage contains "Error writing state store files")
+ assert(e.getCause.getMessage.contains("Failed to rename"))
+ }
+ }
+
// This test illustrates state store iterator behavior differences leading
to SPARK-38320.
testWithAllCodec("SPARK-38320 - state store iterator behavior differences") {
val ROCKSDB_STATE_STORE = "RocksDBStateStore"
@@ -1430,6 +1449,9 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
/** Return a new provider with the given id */
def newStoreProvider(storeId: StateStoreId): ProviderClass
+ /** Return a new provider with the given id and configuration */
+ def newStoreProvider(storeId: StateStoreId, conf: Configuration):
ProviderClass
+
/** Return a new provider with minimum delta and version to retain in memory
*/
def newStoreProvider(minDeltasForSnapshot: Int, numOfVersToRetainInMemory:
Int): ProviderClass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]