This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 41b34d6be494 [SPARK-55493][SS] Do not mkdirs in streaming checkpoint
state directory in StateDataSource
41b34d6be494 is described below
commit 41b34d6be494168ef4dfe40091001c0ba28bdf44
Author: Livia Zhu <[email protected]>
AuthorDate: Fri Feb 20 15:51:26 2026 -0800
[SPARK-55493][SS] Do not mkdirs in streaming checkpoint state directory in
StateDataSource
### What changes were proposed in this pull request?
Previously, we try to create a new directory for the state directory in the
checkpoint directory if it doesn't exist when running `StateDataSource`. This
change creates new readOnly mode so that datasources do not need to mkdirs.
### Why are the changes needed?
Allow usage of StateDataSource on checkpoints that are read-only.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude opus 4.6
Closes #54277 from liviazhu/liviazhu-db/stds-fix.
Authored-by: Livia Zhu <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../datasources/v2/state/StateDataSource.scala | 2 +-
.../v2/state/StreamStreamJoinStateHelper.scala | 7 +-
.../v2/state/metadata/StateMetadataSource.scala | 6 +-
.../streaming/state/OperatorStateMetadata.scala | 19 ++-
.../state/StateSchemaCompatibilityChecker.scala | 14 +-
.../v2/state/StateDataSourceReadSuite.scala | 157 +++++++++++++++++++++
.../v2/state/StateDataSourceTestBase.scala | 27 +++-
7 files changed, 216 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 9ccbb9a649f2..07b756006525 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -379,7 +379,7 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
partitionId, sourceOptions.storeName)
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
val manager = new StateSchemaCompatibilityChecker(providerId,
hadoopConf,
- oldSchemaFilePaths = oldSchemaFilePaths)
+ oldSchemaFilePaths = oldSchemaFilePaths, createSchemaDir = false)
val stateSchema = manager.readSchemaFile()
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
index 5cb38022159c..78648a01a6f6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
@@ -94,12 +94,13 @@ object StreamStreamJoinStateHelper {
// read the key schema from the keyToNumValues store for the join keys
val manager = new StateSchemaCompatibilityChecker(
- providerIdForKeyToNumValues, newHadoopConf, oldSchemaFilePaths)
+ providerIdForKeyToNumValues, newHadoopConf, oldSchemaFilePaths,
+ createSchemaDir = false)
val kSchema = manager.readSchemaFile().head.keySchema
// read the value schema from the keyWithIndexToValue store for the
values
val manager2 = new
StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
- newHadoopConf, oldSchemaFilePaths)
+ newHadoopConf, oldSchemaFilePaths, createSchemaDir = false)
val vSchema = manager2.readSchemaFile().head.valueSchema
(kSchema, vSchema)
@@ -109,7 +110,7 @@ object StreamStreamJoinStateHelper {
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
val manager = new StateSchemaCompatibilityChecker(
- providerId, newHadoopConf, oldSchemaFilePaths)
+ providerId, newHadoopConf, oldSchemaFilePaths, createSchemaDir =
false)
val kSchema = manager.readSchemaFile().find { schema =>
schema.colFamilyName == storeNames(0)
}.map(_.keySchema).get
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
index 31e6ac30a598..cef3a7a7e5cd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2.state.metadata
+import java.io.FileNotFoundException
import java.util
import scala.jdk.CollectionConverters._
@@ -222,7 +223,8 @@ class StateMetadataPartitionReader(
1
}
OperatorStateMetadataReader.createReader(
- operatorIdPath, hadoopConf, operatorStateMetadataVersion,
batchId).read() match {
+ operatorIdPath, hadoopConf, operatorStateMetadataVersion, batchId,
+ createMetadataDir = false).read() match {
case Some(metadata) => metadata
case None => throw
StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
batchId)
@@ -231,7 +233,7 @@ class StateMetadataPartitionReader(
} catch {
// if the operator metadata is not present, catch the exception
// and return an empty array
- case ex: Exception =>
+ case ex: FileNotFoundException =>
logWarning(log"Failed to find operator metadata for " +
log"path=${MDC(LogKeys.CHECKPOINT_LOCATION, checkpointLocation)} " +
log"with exception=${MDC(LogKeys.EXCEPTION, ex)}")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index 6b2295da03b9..ac0f42c34007 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -199,12 +199,14 @@ object OperatorStateMetadataReader {
stateCheckpointPath: Path,
hadoopConf: Configuration,
version: Int,
- batchId: Long): OperatorStateMetadataReader = {
+ batchId: Long,
+ createMetadataDir: Boolean = true): OperatorStateMetadataReader = {
version match {
case 1 =>
new OperatorStateMetadataV1Reader(stateCheckpointPath, hadoopConf)
case 2 =>
- new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf,
batchId)
+ new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf,
batchId,
+ createMetadataDir)
case _ =>
throw new IllegalArgumentException(s"Failed to create reader for
operator metadata " +
s"with version=$version")
@@ -319,7 +321,8 @@ class OperatorStateMetadataV2Writer(
class OperatorStateMetadataV2Reader(
stateCheckpointPath: Path,
hadoopConf: Configuration,
- batchId: Long) extends OperatorStateMetadataReader {
+ batchId: Long,
+ createMetadataDir: Boolean = true) extends OperatorStateMetadataReader
with Logging {
// Check that the requested batchId is available in the checkpoint directory
val baseCheckpointDir = stateCheckpointPath.getParent.getParent
@@ -331,7 +334,12 @@ class OperatorStateMetadataV2Reader(
private val metadataDirPath =
OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
private lazy val fm = CheckpointFileManager.create(metadataDirPath,
hadoopConf)
- fm.mkdirs(metadataDirPath.getParent)
+ if (createMetadataDir && !fm.exists(metadataDirPath.getParent)) {
+ fm.mkdirs(metadataDirPath.getParent)
+ } else if (!createMetadataDir) {
+ logInfo(log"Skipping metadata directory creation (createMetadataDir=false)
" +
+ log"at ${MDC(LogKeys.CHECKPOINT_LOCATION, baseCheckpointDir.toString)}")
+ }
override def version: Int = 2
@@ -352,7 +360,8 @@ class OperatorStateMetadataV2Reader(
// List the available batches in the operator metadata directory
private def listOperatorMetadataBatches(): Array[Long] = {
- if (!fm.exists(metadataDirPath)) {
+ // If parent doesn't exist, return empty array rather than throwing an
exception
+ if (!fm.exists(metadataDirPath.getParent) || !fm.exists(metadataDirPath)) {
return Array.empty
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index ca18ce9067b3..166ec450bfbd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -81,7 +81,8 @@ class StateSchemaCompatibilityChecker(
providerId: StateStoreProviderId,
hadoopConf: Configuration,
oldSchemaFilePaths: List[Path] = List.empty,
- newSchemaFilePath: Option[Path] = None) extends Logging {
+ newSchemaFilePath: Option[Path] = None,
+ createSchemaDir: Boolean = true) extends Logging {
// For OperatorStateMetadataV1: Only one schema file present per operator
// per query
@@ -96,7 +97,12 @@ class StateSchemaCompatibilityChecker(
private val fm = CheckpointFileManager.create(schemaFileLocation, hadoopConf)
- fm.mkdirs(schemaFileLocation.getParent)
+ if (createSchemaDir && !fm.exists(schemaFileLocation.getParent)) {
+ fm.mkdirs(schemaFileLocation.getParent)
+ } else if (!createSchemaDir) {
+ logInfo(log"Skipping schema directory creation (createSchemaDir=false) " +
+ log"at ${MDC(LogKeys.CHECKPOINT_LOCATION,
schemaFileLocation.getParent.toString)}")
+ }
private val conf =
SparkSession.getActiveSession.map(_.sessionState.conf).getOrElse(new SQLConf())
@@ -112,7 +118,7 @@ class StateSchemaCompatibilityChecker(
def readSchemaFiles(): Map[String, List[StateStoreColFamilySchema]] = {
val stateSchemaFilePaths = (oldSchemaFilePaths ++
List(schemaFileLocation)).distinct
stateSchemaFilePaths.flatMap { schemaFile =>
- if (fm.exists(schemaFile)) {
+ if (fm.exists(schemaFile.getParent) && fm.exists(schemaFile)) {
val inStream = fm.open(schemaFile)
StateSchemaCompatibilityChecker.readSchemaFile(inStream)
} else {
@@ -163,7 +169,7 @@ class StateSchemaCompatibilityChecker(
* otherwise
*/
private def getExistingKeyAndValueSchema(): List[StateStoreColFamilySchema]
= {
- if (fm.exists(schemaFileLocation)) {
+ if (fm.exists(schemaFileLocation.getParent) &&
fm.exists(schemaFileLocation)) {
readSchemaFile()
} else {
List.empty
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 526d39478b91..ce29c87bc76e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, TimeMode,
TransformWithStateSuiteUtils}
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase {
import testImplicits._
@@ -1501,3 +1502,159 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
}
}
+
+/**
+ * Test suite that verifies the state data source reader does not create empty
state
+ * directories when reading state for all stateful operators.
+ */
+class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase {
+
+ /**
+ * Asserts that the cause chain of the given exception contains
+ * an instance of the expected type.
+ */
+ private def assertCauseChainContains(
+ e: Throwable,
+ expectedType: Class[_ <: Throwable]): Unit = {
+ var current: Throwable = e
+ while (current != null) {
+ if (expectedType.isInstance(current)) return
+ current = current.getCause
+ }
+ fail(
+ s"Expected ${expectedType.getSimpleName} in cause chain, " +
+ s"but got: ${e.getClass.getSimpleName}: ${e.getMessage}")
+ }
+
+ /**
+ * Runs a stateful query to create the checkpoint structure, deletes the
state directory,
+ * then attempts to read via the state data source and verifies that the
state directory
+ * is not recreated.
+ *
+ * @param runQuery function that runs one batch of a stateful query given a
checkpoint path
+ * @param readState function that attempts to read state given a checkpoint
path
+ * @param expectedCause the exception type expected in the cause chain
+ */
+ private def assertStateDirectoryNotRecreatedOnRead(
+ runQuery: String => Unit,
+ readState: String => Unit,
+ expectedCause: Class[_ <: Throwable] =
+ classOf[StateDataSourceReadStateSchemaFailure]): Unit = {
+ withTempDir { tempDir =>
+ val checkpointPath = tempDir.getAbsolutePath
+
+ // Step 1: Run the stateful query to create the full checkpoint structure
+ runQuery(checkpointPath)
+
+ // Step 2: Delete the state directory
+ val stateDir = new File(tempDir, "state")
+ assert(stateDir.exists(), "State directory should exist after running
the query")
+ Utils.deleteRecursively(stateDir)
+ assert(!stateDir.exists(), "State directory should be deleted")
+
+ // Step 3: Attempt to read state - expected to fail since state is
deleted
+ val e = intercept[Exception] {
+ readState(checkpointPath)
+ }
+ assertCauseChainContains(e, expectedCause)
+
+ // Step 4: Verify the state directory was NOT recreated by the reader
+ assert(!stateDir.exists(),
+ "State data source reader should not recreate the deleted state
directory")
+ }
+ }
+
+ test("streaming aggregation: no empty state dir created on read") {
+ assertStateDirectoryNotRecreatedOnRead(
+ runQuery = checkpointPath => {
+ runLargeDataStreamingAggregationQuery(checkpointPath)
+ },
+ readState = checkpointPath => {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .load()
+ .collect()
+ }
+ )
+ }
+
+ test("drop duplicates: no empty state dir created on read") {
+ assertStateDirectoryNotRecreatedOnRead(
+ runQuery = checkpointPath => {
+ runDropDuplicatesQuery(checkpointPath)
+ },
+ readState = checkpointPath => {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .load()
+ .collect()
+ }
+ )
+ }
+
+ test("flatMapGroupsWithState: no empty state dir created on read") {
+ assertStateDirectoryNotRecreatedOnRead(
+ runQuery = checkpointPath => {
+ runFlatMapGroupsWithStateQuery(checkpointPath)
+ },
+ readState = checkpointPath => {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .load()
+ .collect()
+ }
+ )
+ }
+
+ test("stream-stream join: no empty state dir created on read") {
+ assertStateDirectoryNotRecreatedOnRead(
+ runQuery = checkpointPath => {
+ runStreamStreamJoinQuery(checkpointPath)
+ },
+ readState = checkpointPath => {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .option(StateSourceOptions.JOIN_SIDE, "left")
+ .load()
+ .collect()
+ }
+ )
+ }
+
+ test("transformWithState: no empty state dir created on read") {
+ assertStateDirectoryNotRecreatedOnRead(
+ runQuery = checkpointPath => {
+ runTransformWithStateQuery(checkpointPath)
+ },
+ readState = checkpointPath => {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+ .load()
+ .collect()
+ },
+ expectedCause = classOf[IllegalArgumentException]
+ )
+ }
+
+ test("session window aggregation: no empty state dir created on read") {
+ assertStateDirectoryNotRecreatedOnRead(
+ runQuery = checkpointPath => {
+ runSessionWindowAggregationQuery(checkpointPath)
+ },
+ readState = checkpointPath => {
+ spark.read
+ .format("statestore")
+ .option(StateSourceOptions.PATH, checkpointPath)
+ .load()
+ .collect()
+ }
+ )
+ }
+
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
index 98ecdde2e571..aed484470e47 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{DataFrame, Dataset}
import
org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper.{LeftSide,
RightSide}
import
org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
-import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
StateStore}
+import
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec,
RocksDBStateStoreProvider, StateStore}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
@@ -445,6 +445,31 @@ trait StateDataSourceTestBase extends StreamTest with
StateStoreMetricsTest {
)
}
+ /**
+ * Runs one batch of a transformWithState query (using
RunningCountStatefulProcessor)
+ * to create checkpoint structure with state. Uses RocksDBStateStoreProvider.
+ */
+ protected def runTransformWithStateQuery(checkpointRoot: String): Unit = {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString
+ ) {
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS()
+ .groupByKey(x => x)
+ .transformWithState(new
org.apache.spark.sql.streaming.RunningCountStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+
+ testStream(result, OutputMode.Update())(
+ StartStream(checkpointLocation = checkpointRoot),
+ AddData(inputData, "a"),
+ CheckNewAnswer(("a", "1")),
+ StopStream
+ )
+ }
+ }
+
/**
* Helper function to create a query that combines deduplication and
aggregation.
* This creates a more complex query with multiple stateful operators:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]