This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c69e999f82e1 [SPARK-54117][SS] Throw better error to indicate that TWS
is only supported with RocksDB state store provider
c69e999f82e1 is described below
commit c69e999f82e17ed516ff1e98fb534b23f3230dab
Author: Dmytro Fedoriaka <[email protected]>
AuthorDate: Fri Nov 7 16:43:46 2025 +0900
[SPARK-54117][SS] Throw better error to indicate that TWS is only supported
with RocksDB state store provider
### What changes were proposed in this pull request?
Change error message when user uses TransformWithState with state store
provider other than RocksDB state store provider.
### Why are the changes needed?
Improves user experience by making it clear that they need to use
RocksDBStateStoreProvider.
### Does this PR introduce _any_ user-facing change?
Yes, changes error message when user uses TransformWithState with
HDFSBackedStateStoreProvider.
Old error message:
`[UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES] The feature is not
supported:
Creating multiple column families with HDFSBackedStateStoreProvider is not
supported.`
New error message:
`[UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS] The feature is not
supported:
Store backend HDFSBackedStateStoreProvider is not supported by
TransformWithState operator. Please use RocksDBStateStoreProvider.`
### How was this patch tested?
Unit tests: TransformWithStateSuite, TransformWithStateValidationSuite.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52822 from fedimser/fedimser/tws-rocksdb-error.
Lead-authored-by: Dmytro Fedoriaka <[email protected]>
Co-authored-by: Dmytro Fedoriaka <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 +++++
.../streaming/TransformWithStateInPySparkExec.scala | 1 +
.../transformwithstate/TransformWithStateExec.scala | 1 +
.../transformwithstate/TransformWithStateExecBase.scala | 17 ++++++++++++++++-
.../execution/streaming/state/StateStoreErrors.scala | 10 ++++++++++
.../spark/sql/streaming/TransformWithStateSuite.scala | 4 ++--
6 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 3518765efd09..b16fe84ae27f 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6646,6 +6646,11 @@
"State TTL with <stateStoreProvider> is not supported. Please use
RocksDBStateStoreProvider."
]
},
+ "STORE_BACKEND_NOT_SUPPORTED_FOR_TWS" : {
+ "message" : [
+ "Store backend <stateStoreProvider> is not supported by
TransformWithState operator. Please use RocksDBStateStoreProvider."
+ ]
+ },
"TABLE_OPERATION" : {
"message" : [
"Table <tableName> does not support <operation>. Please check the
current catalog and namespace to make sure the qualified table name is
expected, and also check the catalog implementation which is configured by
\"spark.sql.catalog\"."
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
index c10d21933c2f..a1cf71844950 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
@@ -181,6 +181,7 @@ case class TransformWithStateInPySparkExec(
*/
override protected def doExecute(): RDD[InternalRow] = {
metrics
+ validateStateStoreProvider(isStreaming)
if (!hasInitialState) {
if (isStreaming) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
index 52a0d470c266..cc1c3263ad74 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
@@ -374,6 +374,7 @@ case class TransformWithStateExec(
metrics // force lazy init at driver
validateTimeMode()
+ validateStateStoreProvider(isStreaming)
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sessionState.conf)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
index c2d24e735ab2..f5abe333d0f3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
@@ -24,7 +24,8 @@ import
org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorCustomMetric,
StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning,
StateStoreWriter, WatermarkSupport}
import
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
-import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata,
TransformWithStateUserFunctionException}
+import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata,
RocksDBStateStoreProvider, StateStoreErrors,
TransformWithStateUserFunctionException}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
import org.apache.spark.sql.types.{BinaryType, StructType}
import org.apache.spark.util.NextIterator
@@ -53,6 +54,12 @@ abstract class TransformWithStateExecBase(
override def operatorStateMetadataVersion: Int = 2
+ // Supported state store providers for TransformWithState.
+ // TransformWithState currently supports only RocksDBStateStoreProvider.
+ private val SUPPORTED_STATE_STORE_PROVIDERS = Set(
+ classOf[RocksDBStateStoreProvider].getName
+ )
+
override def supportsSchemaEvolution: Boolean = true
override def left: SparkPlan = child
@@ -216,6 +223,14 @@ abstract class TransformWithStateExecBase(
}
}
+ /** Validates that the configured state store provider is supported by
TransformWithState. */
+ protected def validateStateStoreProvider(isStreaming: Boolean): Unit = {
+ val providerName = conf.getConf(SQLConf.STATE_STORE_PROVIDER_CLASS)
+ if (isStreaming &&
!SUPPORTED_STATE_STORE_PROVIDERS.contains(providerName)) {
+ throw StateStoreErrors.storeBackendNotSupportedForTWS(providerName)
+ }
+ }
+
/**
* Executes a block of code with standardized error handling for
StatefulProcessor operations.
* Rethrows SparkThrowables directly and wraps other exceptions in
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 6d211fb6fc0a..cd29f8f30f6e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -65,6 +65,11 @@ object StateStoreErrors {
new
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
}
+ def storeBackendNotSupportedForTWS(stateStoreProvider: String):
+ StateStoreBackendNotSupportedForTWSException = {
+ new StateStoreBackendNotSupportedForTWSException(stateStoreProvider)
+ }
+
def cannotUseColumnFamilyWithInvalidName(operationName: String,
colFamilyName: String):
StateStoreCannotUseColumnFamilyWithInvalidName = {
new StateStoreCannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
@@ -330,6 +335,11 @@ class
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider:
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
+class StateStoreBackendNotSupportedForTWSException(stateStoreProvider: String)
+ extends SparkUnsupportedOperationException(
+ errorClass = "UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS",
+ messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
+
class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String,
colFamilyName: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index aac858719196..21de9d9c1ec8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -2570,7 +2570,7 @@ class TransformWithStateValidationSuite extends
StateStoreMetricsTest {
testStream(result, OutputMode.Update())(
AddData(inputData, "a"),
- ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] { t
=>
+ ExpectFailure[StateStoreBackendNotSupportedForTWSException] { t =>
assert(t.getMessage.contains("not supported"))
}
)
@@ -2836,7 +2836,7 @@ class TransformWithStateValidationSuite extends
StateStoreMetricsTest {
)
testStream(result, OutputMode.Update())(
AddData(inputData, InitInputRow("a", "add", -1.0)),
- ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] {
+ ExpectFailure[StateStoreBackendNotSupportedForTWSException] {
(t: Throwable) => {
assert(t.getMessage.contains("not supported"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]