This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new fe1818698811 [SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set fe1818698811 is described below commit fe181869881147857e286612452d30fb109b48a0 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Sat May 17 09:43:30 2025 +0900 [SPARK-52188] Fix for StateDataSource where StreamExecution.RUN_ID_KEY is not set Setting the StreamExecution.RUN_ID_KEY in `StateStore.createAndInit` Trying to use the new statestore source to read a checkpoint using RocksDB fails with "assert failed: Failed to find query id/batch Id in task context". This happens here: https://github.com/apache/spark/blob/v4.0.0-rc6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L801 This doesn't fail in tests since the assertion specifically allows this case in tests. The RUN_ID_KEY never gets set on the path of loading the state store for the statestore source. No Unit tests No Closes #50924 from ericm-db/sds-fix. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/state/RocksDBStateStoreProvider.scala | 9 ++------- .../apache/spark/sql/execution/streaming/state/StateStore.scala | 2 +- .../v2/state/StateDataSourceChangeDataReadSuite.scala | 8 ++++++-- .../datasources/v2/state/StateDataSourceReadSuite.scala | 7 +++++-- .../state/RocksDBStateStoreCheckpointFormatV2Suite.scala | 4 +++- .../sql/execution/streaming/state/RocksDBStateStoreSuite.scala | 3 ++- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 1 + .../spark/sql/execution/streaming/state/ValueStateSuite.scala | 3 ++- 8 files changed, 22 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 735e8d567b87..4d63d31d4f62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ -import java.util.UUID import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.util.control.NonFatal @@ -795,12 +794,8 @@ object RocksDBStateStoreProvider { private def getRunId(hadoopConf: Configuration): String = { val runId = hadoopConf.get(StreamExecution.RUN_ID_KEY) - if (runId != null) { - runId - } else { - assert(Utils.isTesting, "Failed to find query id/batch Id in task context") - UUID.randomUUID().toString - } + assert(runId != null) + runId } // Native operation latencies report as latency in microseconds diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 8ba3fc37162c..00716a69df83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -529,6 +529,7 @@ object StateStoreProvider { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean, stateSchemaProvider: Option[StateSchemaProvider]): StateStoreProvider = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, providerId.queryRunId.toString) val provider = create(storeConf.providerClass) provider.init(providerId.storeId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaProvider) @@ -822,7 +823,6 @@ object StateStore extends Logging { if (version < 0) { throw QueryExecutionErrors.unexpectedStateStoreVersion(version) } - hadoopConf.set(StreamExecution.RUN_ID_KEY, storeProviderId.queryRunId.toString) val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey, stateSchemaBroadcast) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index 59c0af8afd19..8ee6d8762404 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.datasources.v2.state +import java.util.UUID + import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -71,6 +73,8 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB */ private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = { val provider = newStateStoreProvider() + val conf = new Configuration + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( StateStoreId(checkpointDir, 0, 0), keySchema, @@ -78,7 +82,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, StateStoreConf(spark.sessionState.conf), - new Configuration) + conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index fca7d16012ce..56a6a1e641f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state import java.io.{File, FileWriter} import java.nio.ByteOrder +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions @@ -28,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row} import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil -import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog} +import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf @@ -588,6 +589,8 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass */ private def getNewStateStoreProvider(checkpointDir: String): StateStoreProvider = { val provider = newStateStoreProvider() + val conf = new Configuration() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( StateStoreId(checkpointDir, 0, 0), keySchema, @@ -595,7 +598,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = false, StateStoreConf(spark.sessionState.conf), - new Configuration) + conf) provider } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index b671fe6260cd..3e73c347b26d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.File +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -26,7 +27,7 @@ import org.scalatest.Tag import org.apache.spark.{SparkContext, SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, ForeachWriter} import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream} +import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -154,6 +155,7 @@ class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean = false, stateSchemaProvider: Option[StateSchemaProvider] = None): Unit = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) innerProvider.init( stateStoreId, keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 5aea0077e2aa..a51c85776be8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamExecution} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -2097,6 +2097,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid useMultipleValuesPerKey: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() val testStateSchemaProvider = new TestStateSchemaProvider + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( storeId, keySchema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 08648148b4af..109cc9fd5daa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1020,6 +1020,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, numOfVersToRetainInMemory: Int = SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get, hadoopConf: Configuration = new Configuration): HDFSBackedStateStoreProvider = { + hadoopConf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) val sqlConf = getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory) val provider = new HDFSBackedStateStoreProvider() provider.init( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala index 8af42d6dec26..e78698e2e308 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} -import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, ValueStateImplWithTTL} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StreamExecution, ValueStateImplWithTTL} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState} import org.apache.spark.sql.test.SharedSparkSession @@ -460,6 +460,7 @@ abstract class StateVariableSuiteBase extends SharedSparkSession conf: Configuration = new Configuration, useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { val provider = new RocksDBStateStoreProvider() + conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString) provider.init( storeId, schemaForKeyRow, schemaForValueRow, keyStateEncoderSpec, useColumnFamilies, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org