This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bdcb79f23da3 [SPARK-48543][SS] Track state row validation failures
using explicit error class
bdcb79f23da3 is described below
commit bdcb79f23da3d09469910508426a54a78adcbda6
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Thu Jun 13 16:47:49 2024 +0900
[SPARK-48543][SS] Track state row validation failures using explicit error
class
### What changes were proposed in this pull request?
Track state row validation failures using explicit error class
### Why are the changes needed?
We want to track these exceptions explicitly since they could be indicative
of underlying corruptions/data loss scenarios.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
```
13:06:32.803 INFO org.apache.spark.util.ShutdownHookManager: Deleting
directory
/Users/anish.shrigondekar/spark/spark/target/tmp/spark-6d90d3f3-0f37-48b8-8506-a8cdee3d25d7
[info] Run completed in 9 seconds, 861 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46885 from anishshri-db/task/SPARK-48543.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 24 ++++++++++++++++++++++
.../sql/execution/streaming/state/StateStore.scala | 18 ++++++----------
.../streaming/state/StateStoreErrors.scala | 21 ++++++++++++++++++-
.../streaming/state/StateStoreSuite.scala | 5 +++--
...reamingStateStoreFormatCompatibilitySuite.scala | 5 +++--
5 files changed, 56 insertions(+), 17 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 36d8fe1daa37..35dfa7a6c349 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3735,6 +3735,18 @@
],
"sqlState" : "42K06"
},
+ "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : {
+ "message" : [
+ "The streaming query failed to validate written state for key row.",
+ "The following reasons may cause this:",
+ "1. An old Spark version wrote the checkpoint that is incompatible with
the current one",
+ "2. Corrupt checkpoint files",
+ "3. The query changed in an incompatible way between restarts",
+ "For the first case, use a new checkpoint directory or use the original
Spark version",
+ "to process the streaming state. Retrieved error_message=<errorMsg>"
+ ],
+ "sqlState" : "XX000"
+ },
"STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : {
"message" : [
"Provided key schema does not match existing state key schema.",
@@ -3769,6 +3781,18 @@
],
"sqlState" : "42802"
},
+ "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : {
+ "message" : [
+ "The streaming query failed to validate written state for value row.",
+ "The following reasons may cause this:",
+ "1. An old Spark version wrote the checkpoint that is incompatible with
the current one",
+ "2. Corrupt checkpoint files",
+ "3. The query changed in an incompatible way between restarts",
+ "For the first case, use a new checkpoint directory or use the original
Spark version",
+ "to process the streaming state. Retrieved error_message=<errorMsg>"
+ ],
+ "sqlState" : "XX000"
+ },
"STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : {
"message" : [
"Provided value schema does not match existing state value schema.",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index b59fe65fb14a..2f9ce2c236f4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -279,16 +279,6 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
-/**
- * An exception thrown when an invalid UnsafeRow is detected in state store.
- */
-class InvalidUnsafeRowException(error: String)
- extends RuntimeException("The streaming query failed by state format
invalidation. " +
- "The following reasons may cause this: 1. An old Spark version wrote the
checkpoint that is " +
- "incompatible with the current one; 2. Broken checkpoint files; 3. The
query is changed " +
- "among restart. For the first case, you can try to restart the application
without " +
- s"checkpoint or use the legacy Spark version to process the streaming
state.\n$error", null)
-
sealed trait KeyStateEncoderSpec
case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends
KeyStateEncoderSpec
@@ -434,12 +424,16 @@ object StateStoreProvider {
conf: StateStoreConf): Unit = {
if (conf.formatValidationEnabled) {
val validationError =
UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
- validationError.foreach { error => throw new
InvalidUnsafeRowException(error) }
+ validationError.foreach { error =>
+ throw StateStoreErrors.keyRowFormatValidationFailure(error)
+ }
if (conf.formatValidationCheckValue) {
val validationError =
UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow,
valueSchema)
- validationError.foreach { error => throw new
InvalidUnsafeRowException(error) }
+ validationError.foreach { error =>
+ throw StateStoreErrors.valueRowFormatValidationFailure(error)
+ }
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 36be4d9f5bab..205e093e755d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.streaming.state
-import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
+import org.apache.spark.{SparkException, SparkRuntimeException,
SparkUnsupportedOperationException}
/**
* Object for grouping error messages from (most) exceptions thrown from State
API V2
@@ -39,6 +39,16 @@ object StateStoreErrors {
)
}
+ def keyRowFormatValidationFailure(errorMsg: String):
+ StateStoreKeyRowFormatValidationFailure = {
+ new StateStoreKeyRowFormatValidationFailure(errorMsg)
+ }
+
+ def valueRowFormatValidationFailure(errorMsg: String):
+ StateStoreValueRowFormatValidationFailure = {
+ new StateStoreValueRowFormatValidationFailure(errorMsg)
+ }
+
def unsupportedOperationOnMissingColumnFamily(operationName: String,
colFamilyName: String):
StateStoreUnsupportedOperationOnMissingColumnFamily = {
new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName,
colFamilyName)
@@ -245,3 +255,12 @@ class StateStoreValueSchemaNotCompatible(
"storedValueSchema" -> storedValueSchema,
"newValueSchema" -> newValueSchema))
+class StateStoreKeyRowFormatValidationFailure(errorMsg: String)
+ extends SparkRuntimeException(
+ errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE",
+ messageParameters = Map("errorMsg" -> errorMsg))
+
+class StateStoreValueRowFormatValidationFailure(errorMsg: String)
+ extends SparkRuntimeException(
+ errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
+ messageParameters = Map("errorMsg" -> errorMsg))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 6a6867fbb552..98b2030f1bac 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef
+import
org.apache.spark.sql.execution.streaming.state.StateStoreValueRowFormatValidationFailure
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -1606,12 +1607,12 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
// By default, when there is an invalid pair of value row and value
schema, it should throw
val keyRow = dataToKeyRow("key", 1)
val valueRow = dataToValueRow(2)
- val e = intercept[InvalidUnsafeRowException] {
+ val e = intercept[StateStoreValueRowFormatValidationFailure] {
// Here valueRow doesn't match with prefixKeySchema
StateStoreProvider.validateStateRowFormat(
keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf())
}
- assert(e.getMessage.contains("The streaming query failed by state format
invalidation"))
+ assert(e.getMessage.contains("The streaming query failed to validate
written state"))
// When sqlConf.stateStoreFormatValidationEnabled is set to false and
// StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
index 3cd6b397a8b8..8a9d4d42ef2b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkException,
SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.execution.streaming.state.InvalidUnsafeRowException
+import
org.apache.spark.sql.execution.streaming.state.{StateStoreKeyRowFormatValidationFailure,
StateStoreValueRowFormatValidationFailure}
import org.apache.spark.sql.functions._
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.util.Utils
@@ -254,7 +254,8 @@ class StreamingStateStoreFormatCompatibilitySuite extends
StreamTest {
private def findStateSchemaException(exc: Throwable): Boolean = {
exc match {
case _: SparkUnsupportedOperationException => true
- case _: InvalidUnsafeRowException => true
+ case _: StateStoreKeyRowFormatValidationFailure => true
+ case _: StateStoreValueRowFormatValidationFailure => true
case e1 if e1.getCause != null => findStateSchemaException(e1.getCause)
case _ => false
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]