This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c09ac2373e1c [SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set c09ac2373e1c is described below commit c09ac2373e1cbebfb9875fd9ff0cba092fc08cf2 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Sat May 17 09:43:30 2025 +0900 [SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set ### What changes were proposed in this pull request? Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit` ### Why are the changes needed? Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context". This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801 This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50924 from ericm-db/sds-fix. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/execution/streaming/state/StateStore.scala | 10 +++------- .../v2/state/StateDataSourceChangeDataReadSuite.scala | 8 ++++++-- .../datasources/v2/state/StateDataSourceReadSuite.scala | 7 +++++-- .../state/RocksDBStateStoreCheckpointFormatV2Suite.scala | 4 +++- .../sql/execution/streaming/state/RocksDBStateStoreSuite.scala | 3 ++- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 1 + .../spark/sql/execution/streaming/state/ValueStateSuite.scala | 3 ++- 7 files changed, 22 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 3e14d02b73da..9a85169ad451 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -630,6 +630,7 @@ object StateStoreProvider extends Logging { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean, stateSchemaProvider: Option[StateSchemaProvider]): StateStoreProvider = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, providerId.queryRunId.toString) val provider = create(storeConf.providerClass) provider.init(providerId.storeId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaProvider) @@ -669,12 +670,8 @@ object StateStoreProvider extends Logging { */ private[state] def getRunId(hadoopConf: Configuration): String = { val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY) - if (runId != null) { - runId - } else { - assert(Utils.isTesting, "Failed to find query id/batch Id in task context") - UUID.randomUUID().toString - } + assert(runId != null) + runId } /** @@ -968,7 +965,6 @@ object StateStore extends Logging { if (version < 0) { throw QueryExecutionErrors.unexpectedStateStoreVersion(version) } - hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString) val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaBroadcast) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index 59c0af8afd19..8ee6d8762404 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.datasources.v2.state +import java.util.UUID + import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -71,6 +73,8 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB */ private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = { val provider = newStateStoreProvider() + val conf = new Configuration + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( StateStoreId(checkpointDir, 0, 0), keySchema, @@ -78,7 +82,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, StateStoreConf(spark.sessionState.conf), - new Configuration) + conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index fca7d16012ce..56a6a1e641f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state import java.io.{File, FileWriter} import java.nio.ByteOrder +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions @@ -28,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row} import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil -import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog} +import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -588,6 +589,8 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass */ private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = { val provider = newStateStoreProvider() + val conf = new Configuration() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( StateStoreId(checkpointDir, 0, 0), keySchema, @@ -595,7 +598,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, StateStoreConf(spark.sessionState.conf), - new Configuration) + conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index 22150ffde5db..fd317903be96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.File +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -26,7 +27,7 @@ import org.scalatest.Tag import org.apache.spark.{SparkContext, SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, ForeachWriter} import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream} +import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -154,6 +155,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean = false, stateSchemaProvider: Option[StateSchemaProvider] = None): Unit = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) innerProvider.init( stateStoreId, keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index b13508682188..99d7e255f951 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamExecution} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -2098,6 +2098,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() val testStateSchemaProvider = new TestStateSchemaProvider + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( storeId, keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 4226ee94e98d..aa4a50b853a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1058,6 +1058,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get, hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory) val provider = new HDFSBackedStateStoreProvider() provider.init( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index 093e8b991cc9..909e888a3dd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} -import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, ValueStateImplWithTTL} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StreamExecution, ValueStateImplWithTTL} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState} import org.apache.spark.sql.test.SharedSparkSession @@ -461,6 +461,7 @@ abstract class StateVariableSuiteBase extends SharedSparkSession conf: Configuration = new Configuration, useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec, useColumnFamilies, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org