This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c69e999f82e1 [SPARK-54117][SS] Throw better error to indicate that TWS 
is only supported with RocksDB state store provider
c69e999f82e1 is described below

commit c69e999f82e17ed516ff1e98fb534b23f3230dab
Author: Dmytro Fedoriaka <[email protected]>
AuthorDate: Fri Nov 7 16:43:46 2025 +0900

    [SPARK-54117][SS] Throw better error to indicate that TWS is only supported 
with RocksDB state store provider
    
    ### What changes were proposed in this pull request?
    
    Change error message when user uses TransformWithState with state store 
provider other than RocksDB state store provider.
    
    ### Why are the changes needed?
    
    Improves user experience by making it clear that they need to use 
RocksDBStateStoreProvider.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, changes error message when user uses TransformWithState with 
HDFSBackedStateStoreProvider.
    
    Old error message: 
`[UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES] The feature is not 
supported:
    Creating multiple column families with HDFSBackedStateStoreProvider is not 
supported.`
    
    New error message: 
`[UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS] The feature is not 
supported:
    Store backend HDFSBackedStateStoreProvider is not supported by 
TransformWithState operator. Please use RocksDBStateStoreProvider.`
    
    ### How was this patch tested?
    
    Unit tests: TransformWithStateSuite, TransformWithStateValidationSuite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52822 from fedimser/fedimser/tws-rocksdb-error.
    
    Lead-authored-by: Dmytro Fedoriaka <[email protected]>
    Co-authored-by: Dmytro Fedoriaka <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../src/main/resources/error/error-conditions.json      |  5 +++++
 .../streaming/TransformWithStateInPySparkExec.scala     |  1 +
 .../transformwithstate/TransformWithStateExec.scala     |  1 +
 .../transformwithstate/TransformWithStateExecBase.scala | 17 ++++++++++++++++-
 .../execution/streaming/state/StateStoreErrors.scala    | 10 ++++++++++
 .../spark/sql/streaming/TransformWithStateSuite.scala   |  4 ++--
 6 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 3518765efd09..b16fe84ae27f 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6646,6 +6646,11 @@
           "State TTL with <stateStoreProvider> is not supported. Please use 
RocksDBStateStoreProvider."
         ]
       },
+      "STORE_BACKEND_NOT_SUPPORTED_FOR_TWS" : {
+        "message" : [
+          "Store backend <stateStoreProvider> is not supported by 
TransformWithState operator. Please use RocksDBStateStoreProvider."
+        ]
+      },
       "TABLE_OPERATION" : {
         "message" : [
           "Table <tableName> does not support <operation>. Please check the 
current catalog and namespace to make sure the qualified table name is 
expected, and also check the catalog implementation which is configured by 
\"spark.sql.catalog\"."
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
index c10d21933c2f..a1cf71844950 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala
@@ -181,6 +181,7 @@ case class TransformWithStateInPySparkExec(
    */
   override protected def doExecute(): RDD[InternalRow] = {
     metrics
+    validateStateStoreProvider(isStreaming)
 
     if (!hasInitialState) {
       if (isStreaming) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
index 52a0d470c266..cc1c3263ad74 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala
@@ -374,6 +374,7 @@ case class TransformWithStateExec(
     metrics // force lazy init at driver
 
     validateTimeMode()
+    validateStateStoreProvider(isStreaming)
 
     if (hasInitialState) {
       val storeConf = new StateStoreConf(session.sessionState.conf)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
index c2d24e735ab2..f5abe333d0f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExecBase.scala
@@ -24,7 +24,8 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Distribution
 import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorCustomMetric,
 StatefulOperatorCustomSumMetric, StatefulOperatorPartitioning, 
StateStoreWriter, WatermarkSupport}
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
-import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, 
TransformWithStateUserFunctionException}
+import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, 
RocksDBStateStoreProvider, StateStoreErrors, 
TransformWithStateUserFunctionException}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{OutputMode, TimeMode}
 import org.apache.spark.sql.types.{BinaryType, StructType}
 import org.apache.spark.util.NextIterator
@@ -53,6 +54,12 @@ abstract class TransformWithStateExecBase(
 
   override def operatorStateMetadataVersion: Int = 2
 
+  // Supported state store providers for TransformWithState.
+  // TransformWithState currently supports only RocksDBStateStoreProvider.
+  private val SUPPORTED_STATE_STORE_PROVIDERS = Set(
+    classOf[RocksDBStateStoreProvider].getName
+  )
+
   override def supportsSchemaEvolution: Boolean = true
 
   override def left: SparkPlan = child
@@ -216,6 +223,14 @@ abstract class TransformWithStateExecBase(
     }
   }
 
+  /** Validates that the configured state store provider is supported by 
TransformWithState. */
+  protected def validateStateStoreProvider(isStreaming: Boolean): Unit = {
+    val providerName = conf.getConf(SQLConf.STATE_STORE_PROVIDER_CLASS)
+    if (isStreaming && 
!SUPPORTED_STATE_STORE_PROVIDERS.contains(providerName)) {
+      throw StateStoreErrors.storeBackendNotSupportedForTWS(providerName)
+    }
+  }
+
   /**
    * Executes a block of code with standardized error handling for 
StatefulProcessor operations.
    * Rethrows SparkThrowables directly and wraps other exceptions in
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 6d211fb6fc0a..cd29f8f30f6e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -65,6 +65,11 @@ object StateStoreErrors {
     new 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
   }
 
+  def storeBackendNotSupportedForTWS(stateStoreProvider: String):
+    StateStoreBackendNotSupportedForTWSException = {
+    new StateStoreBackendNotSupportedForTWSException(stateStoreProvider)
+  }
+
   def cannotUseColumnFamilyWithInvalidName(operationName: String, 
colFamilyName: String):
     StateStoreCannotUseColumnFamilyWithInvalidName = {
       new StateStoreCannotUseColumnFamilyWithInvalidName(operationName, 
colFamilyName)
@@ -330,6 +335,11 @@ class 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider:
     errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
     messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
 
+class StateStoreBackendNotSupportedForTWSException(stateStoreProvider: String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS",
+    messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
+
 class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String, 
colFamilyName: String)
   extends SparkUnsupportedOperationException(
     errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index aac858719196..21de9d9c1ec8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -2570,7 +2570,7 @@ class TransformWithStateValidationSuite extends 
StateStoreMetricsTest {
 
     testStream(result, OutputMode.Update())(
       AddData(inputData, "a"),
-      ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] { t 
=>
+      ExpectFailure[StateStoreBackendNotSupportedForTWSException] { t =>
         assert(t.getMessage.contains("not supported"))
       }
     )
@@ -2836,7 +2836,7 @@ class TransformWithStateValidationSuite extends 
StateStoreMetricsTest {
       )
     testStream(result, OutputMode.Update())(
       AddData(inputData, InitInputRow("a", "add", -1.0)),
-      ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] {
+      ExpectFailure[StateStoreBackendNotSupportedForTWSException] {
         (t: Throwable) => {
           assert(t.getMessage.contains("not supported"))
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to