This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c09ac2373e1c [SPARK-52188] Fix for StateDataSource where 
StreamExecution.RUN_ID_KEY is not set
c09ac2373e1c is described below

commit c09ac2373e1cbebfb9875fd9ff0cba092fc08cf2
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Sat May 17 09:43:30 2025 +0900

    [SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is 
not set
    
    ### What changes were proposed in this pull request?
    Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit`
    
    ### Why are the changes needed?
    
    Trying to use the new statestore source to read a checkpoint using RocksDB 
fails with "assert failed: Failed to find query id/batch Id in task context".
    
    This happens here: 
https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801
    
    This doesn't fail in tests since the assertion specifically allows this 
case in tests. The RUN_ID_KEY never gets set on the path of loading the state 
store for the statestore source.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50924 from ericm-db/sds-fix.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/execution/streaming/state/StateStore.scala       | 10 +++-------
 .../v2/state/StateDataSourceChangeDataReadSuite.scala          |  8 ++++++--
 .../datasources/v2/state/StateDataSourceReadSuite.scala        |  7 +++++--
 .../state/RocksDBStateStoreCheckpointFormatV2Suite.scala       |  4 +++-
 .../sql/execution/streaming/state/RocksDBStateStoreSuite.scala |  3 ++-
 .../spark/sql/execution/streaming/state/StateStoreSuite.scala  |  1 +
 .../spark/sql/execution/streaming/state/ValueStateSuite.scala  |  3 ++-
 7 files changed, 22 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 3e14d02b73da..9a85169ad451 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -630,6 +630,7 @@ object StateStoreProvider extends Logging {
       hadoopConf: Configuration,
       useMultipleValuesPerKey: Boolean,
       stateSchemaProvider: Option[StateSchemaProvider]): StateStoreProvider = {
+    hadoopConf.set(StreamExecution.RUN_ID_KEY, providerId.queryRunId.toString)
     val provider = create(storeConf.providerClass)
     provider.init(providerId.storeId, keySchema, valueSchema, 
keyStateEncoderSpec,
       useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, 
stateSchemaProvider)
@@ -669,12 +670,8 @@ object StateStoreProvider extends Logging {
    */
   private[state] def getRunId(hadoopConf: Configuration): String = {
     val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY)
-    if (runId != null) {
-      runId
-    } else {
-      assert(Utils.isTesting, "Failed to find query id/batch Id in task 
context")
-      UUID.randomUUID().toString
-    }
+    assert(runId != null)
+    runId
   }
 
   /**
@@ -968,7 +965,6 @@ object StateStore extends Logging {
     if (version < 0) {
       throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
     }
-    hadoopConf.set(StreamExecution.RUN_ID_KEY, 
storeProviderId.queryRunId.toString)
     val storeProvider = getStateStoreProvider(storeProviderId, keySchema, 
valueSchema,
       keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, 
useMultipleValuesPerKey,
       stateSchemaBroadcast)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
index 59c0af8afd19..8ee6d8762404 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.v2.state
 
+import java.util.UUID
+
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.Assertions
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
@@ -71,6 +73,8 @@ abstract class StateDataSourceChangeDataReaderSuite extends 
StateDataSourceTestB
    */
   private def getNewStateStoreProvider(checkpointDir: String): 
StateStoreProvider = {
     val provider = newStateStoreProvider()
+    val conf = new Configuration
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
     provider.init(
       StateStoreId(checkpointDir, 0, 0),
       keySchema,
@@ -78,7 +82,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends 
StateDataSourceTestB
       NoPrefixKeyStateEncoderSpec(keySchema),
       useColumnFamilies = false,
       StateStoreConf(spark.sessionState.conf),
-      new Configuration)
+      conf)
     provider
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index fca7d16012ce..56a6a1e641f4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state
 
 import java.io.{File, FileWriter}
 import java.nio.ByteOrder
+import java.util.UUID
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.Assertions
@@ -28,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Encoders, Row}
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
GenericInternalRow}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
-import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
OffsetSeqLog}
+import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
OffsetSeqLog, StreamExecution}
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
@@ -588,6 +589,8 @@ abstract class StateDataSourceReadSuite extends 
StateDataSourceTestBase with Ass
    */
   private def getNewStateStoreProvider(checkpointDir: String): 
StateStoreProvider = {
     val provider = newStateStoreProvider()
+    val conf = new Configuration()
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
     provider.init(
       StateStoreId(checkpointDir, 0, 0),
       keySchema,
@@ -595,7 +598,7 @@ abstract class StateDataSourceReadSuite extends 
StateDataSourceTestBase with Ass
       NoPrefixKeyStateEncoderSpec(keySchema),
       useColumnFamilies = false,
       StateStoreConf(spark.sessionState.conf),
-      new Configuration)
+      conf)
     provider
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
index 22150ffde5db..fd317903be96 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming.state
 
 import java.io.File
+import java.util.UUID
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -26,7 +27,7 @@ import org.scalatest.Tag
 import org.apache.spark.{SparkContext, SparkException, TaskContext}
 import org.apache.spark.sql.{DataFrame, ForeachWriter}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream}
+import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
StreamExecution}
 import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -154,6 +155,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends 
StateStoreProvider {
       hadoopConf: Configuration,
       useMultipleValuesPerKey: Boolean = false,
       stateSchemaProvider: Option[StateSchemaProvider] = None): Unit = {
+    hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
     innerProvider.init(
       stateStoreId,
       keySchema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index b13508682188..99d7e255f951 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
+import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, 
StreamExecution}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -2098,6 +2098,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = {
     val provider = new RocksDBStateStoreProvider()
     val testStateSchemaProvider = new TestStateSchemaProvider
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
     provider.init(
       storeId,
       keySchema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 4226ee94e98d..aa4a50b853a4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -1058,6 +1058,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       minDeltasForSnapshot: Int = 
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
       numOfVersToRetainInMemory: Int = 
SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get,
       hadoopConf: Configuration = new Configuration): 
HDFSBackedStateStoreProvider = {
+    hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
     val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, 
numOfVersToRetainInMemory)
     val provider = new HDFSBackedStateStoreProvider()
     provider.init(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index 093e8b991cc9..909e888a3dd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
-import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, ValueStateImplWithTTL}
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StreamExecution, ValueStateImplWithTTL}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState}
 import org.apache.spark.sql.test.SharedSparkSession
@@ -461,6 +461,7 @@ abstract class StateVariableSuiteBase extends 
SharedSparkSession
       conf: Configuration = new Configuration,
       useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
     val provider = new RocksDBStateStoreProvider()
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
     provider.init(
       storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec,
       useColumnFamilies,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to