This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 23785d33acab [SPARK-51904][SS] Removing async metadata purging for 
StateSchemaV3 and ignoring non-batch files when listing OperatorMetadata files
23785d33acab is described below

commit 23785d33acab4a7a9a0825d874b4f72e4aa19374
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Sat Apr 26 09:08:55 2025 +0900

    [SPARK-51904][SS] Removing async metadata purging for StateSchemaV3 and 
ignoring non-batch files when listing OperatorMetadata files
    
    ### What changes were proposed in this pull request?
    
    Currently, we don't want to purge StateSchemaV3 files, so we need to remove 
the relevant call from MicrobatchExecution.
    Additionally, we want to ignore any files in the metadata or state schema 
directory that don't have a Long (which would cause a parse exception)
    
    ### Why are the changes needed?
    
    The changes are needed because we cannot purge schema files because these 
are necessary until full rewrite is implemented.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50700 from ericm-db/remove-async-purge.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/OperatorStateMetadata.scala    |  29 ++-
 .../streaming/TransformWithStateAvroSuite.scala    | 194 ++++++++++++++++++++-
 .../sql/streaming/TransformWithStateSuite.scala    |  22 ++-
 3 files changed, 234 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index e60927742c63..befa3fb81722 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -332,9 +332,12 @@ class OperatorStateMetadataV2Reader(
     if (!fm.exists(offsetLog)) {
       return Array.empty
     }
+    // Offset Log files are numeric so we want to skip any files that don't
+    // conform to this
     fm.list(offsetLog)
       .filter(f => !f.getPath.getName.startsWith(".")) // ignore hidden files
-      .map(_.getPath.getName.toLong).sorted
+      .flatMap(f => scala.util.Try(f.getPath.getName.toLong).toOption)
+      .sorted
   }
 
   // List the available batches in the operator metadata directory
@@ -342,7 +345,11 @@ class OperatorStateMetadataV2Reader(
     if (!fm.exists(metadataDirPath)) {
       return Array.empty
     }
-    fm.list(metadataDirPath).map(_.getPath.getName.toLong).sorted
+
+    // filter out non-numeric file names (as OperatorStateMetadataV2 file 
names are numeric)
+    fm.list(metadataDirPath)
+      .flatMap(f => scala.util.Try(f.getPath.getName.toLong).toOption)
+      .sorted
   }
 
   override def read(): Option[OperatorStateMetadata] = {
@@ -407,6 +414,8 @@ class OperatorStateMetadataV2FileManager(
     if (thresholdBatchId != 0) {
       val earliestBatchIdKept = deleteMetadataFiles(thresholdBatchId)
       // we need to delete everything from 0 to (earliestBatchIdKept - 1), 
inclusive
+      // TODO: [SPARK-50845]: Currently, deleteSchemaFiles is a no-op since 
earliestBatchIdKept
+      // is always 0, and the earliest schema file to 'keep' is -1.
       deleteSchemaFiles(earliestBatchIdKept - 1)
     }
   }
@@ -418,11 +427,19 @@ class OperatorStateMetadataV2FileManager(
     commitLog.listBatchesOnDisk.headOption.getOrElse(0L)
   }
 
+  // TODO: [SPARK-50845]: Currently, deleteSchemaFiles is a no-op since 
thresholdBatchId
+  // is always -1
   private def deleteSchemaFiles(thresholdBatchId: Long): Unit = {
+    if (thresholdBatchId <= 0) {
+      return
+    }
+    // StateSchemaV3 filenames are of the format {batchId}_{UUID}
+    // so we want to filter for files that do not have this format
     val schemaFiles = fm.list(stateSchemaPath).sorted.map(_.getPath)
     val filesBeforeThreshold = schemaFiles.filter { path =>
-      val batchIdInPath = path.getName.split("_").head.toLong
-      batchIdInPath <= thresholdBatchId
+      scala.util.Try(path.getName.split("_").head.toLong)
+        .toOption
+        .exists(_ <= thresholdBatchId)
     }
     filesBeforeThreshold.foreach { path =>
       fm.delete(path)
@@ -460,8 +477,8 @@ class OperatorStateMetadataV2FileManager(
       }
     }
 
-    // TODO: Implement state schema file purging logic once we have
-    // enabled full-rewrite.
+    // TODO: [SPARK-50845]: Return earliest schema file we need after 
implementing
+    // full-rewrite
     0
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala
index bddcece85486..ce0f2113eac5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala
@@ -17,16 +17,22 @@
 
 package org.apache.spark.sql.streaming
 
+import org.apache.hadoop.fs.Path
 import org.scalactic.source.Position
 import org.scalatest.Tag
+import org.scalatest.matchers.must.Matchers.be
+import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
+import org.scalatest.time.{Seconds, Span}
 
 import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
-import org.apache.spark.sql.execution.streaming.MemoryStream
-import 
org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, 
StateStoreInvalidValueSchemaEvolution, 
StateStoreValueSchemaEvolutionThresholdExceeded}
+import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
MemoryStream, MicroBatchExecution}
+import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
+import 
org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV2, 
RocksDBStateStoreProvider, StateStoreInvalidValueSchemaEvolution, 
StateStoreValueSchemaEvolutionThresholdExceeded}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.StructType
 
 class TransformWithStateAvroSuite extends TransformWithStateSuite {
 
@@ -264,6 +270,190 @@ class TransformWithStateAvroSuite extends 
TransformWithStateSuite {
     }
   }
 
+  test("transformWithState - verify schema files are retained through multiple 
evolutions") {
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1",
+      SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
+      withTempDir { chkptDir =>
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+
+        // Start with initial basic state schema
+        val inputData = MemoryStream[String]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x)
+          .transformWithState(new DefaultValueInitialProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, "a"),
+          CheckNewAnswer(("a", BasicState("a".hashCode, "a"))),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        val hadoopConf = spark.sessionState.newHadoopConf()
+        val fm = CheckpointFileManager.create(new Path(chkptDir.toString),
+          hadoopConf)
+        fm.mkdirs(new Path(new Path(chkptDir.toString, DIR_NAME_OFFSETS),
+          "dummy_path_name"))
+        fm.mkdirs(
+          new Path(OperatorStateMetadataV2.metadataDirPath(
+            new Path(new Path(new Path(chkptDir.toString), "state"), "0")
+          ),
+            "dummy_path_name")
+        )
+        val dummySchemaPath =
+          new Path(stateSchemaPath, "__dummy_file_path")
+        fm.mkdirs(dummySchemaPath)
+
+
+        // Capture initial schema files (after first schema evolution)
+        val initialSchemaFiles = getFiles(stateSchemaPath).length
+        assert(initialSchemaFiles > 0, "Expected schema files after initial 
run")
+
+        // Second run with evolved state (adding fields)
+        val result2 = inputData.toDS()
+          .groupByKey(x => x)
+          .transformWithState(new DefaultValueEvolvedProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, "b"),
+          CheckNewAnswer(("b", EvolvedState("b".hashCode, "b", 100L, true, 
99.9))),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        // Capture schema files after second evolution
+        val afterAddingFieldsSchemaFiles = getFiles(stateSchemaPath).length
+        assert(afterAddingFieldsSchemaFiles > initialSchemaFiles,
+          s"Expected more schema files after adding fields," +
+            s" but had $initialSchemaFiles before and 
$afterAddingFieldsSchemaFiles after")
+
+        // Third run with TwoLongs schema
+        val result3 = inputData.toDS()
+          .groupByKey(x => x)
+          .transformWithState(new RunningCountStatefulProcessorTwoLongs(),
+            TimeMode.None(),
+            OutputMode.Update())
+
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, "c"),
+          CheckNewAnswer(("c", "1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        // Capture schema files after third evolution
+        val afterTwoLongsSchemaFiles = getFiles(stateSchemaPath).length
+        assert(afterTwoLongsSchemaFiles > afterAddingFieldsSchemaFiles,
+          "Expected more schema files after TwoLongs schema change")
+
+        // Fourth run with ReorderedLongs schema
+        val result4 = inputData.toDS()
+          .groupByKey(x => x)
+          .transformWithState(new 
RunningCountStatefulProcessorReorderedFields(),
+            TimeMode.None(),
+            OutputMode.Update())
+
+        testStream(result4, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, "d"),
+          CheckNewAnswer(("d", "1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        // Capture schema files after fourth evolution
+        val afterReorderedSchemaFiles = getFiles(stateSchemaPath).length
+        assert(afterReorderedSchemaFiles > afterTwoLongsSchemaFiles,
+          "Expected more schema files after ReorderedLongs schema change")
+
+        // Fifth run with RenamedFields schema
+        val result5 = inputData.toDS()
+          .groupByKey(x => x)
+          .transformWithState(new RenameEvolvedProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+
+        testStream(result5, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, "e"),
+          CheckNewAnswer(("e", "1")),
+          // Run multiple batches to trigger maintenance
+          AddData(inputData, "f"),
+          CheckNewAnswer(("f", "1")),
+          AddData(inputData, "g"),
+          CheckNewAnswer(("g", "1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        // Verify metadata files were purged with MIN_BATCHES_TO_RETAIN=1
+        val finalMetadataFiles = getFiles(metadataPath).length
+        // We expect the dummy folder and 2 metadata files
+        assert(finalMetadataFiles <= 3,
+          s"Expected metadata files to be purged to at most 3, but found 
$finalMetadataFiles")
+
+        // Verify schema files were NOT purged despite aggressive metadata 
purging
+        val schemaFiles = getFiles(stateSchemaPath).map(_.getPath.getName)
+        val finalSchemaFiles = schemaFiles.length
+        assert(finalSchemaFiles >= 5,
+          s"Expected at least 5 schema files to be retained" +
+            s" (one per schema evolution), but found $finalSchemaFiles")
+        assert(schemaFiles.contains(dummySchemaPath.getName))
+
+        // Verify we can read historical state for different batches
+        // This should work even though metadata may have been purged for 
earlier batches
+        val latestStateDf = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
+          .option(StateSourceOptions.STATE_VAR_NAME, "countState")
+          .load()
+
+        assert(latestStateDf.count() > 0, "Expected to read current state 
data")
+
+        // Check schema of latest state - should have RenamedFields schema 
structure
+        val latestValueField = latestStateDf.schema.fields.find(_.name == 
"value").get
+        val latestValueType = 
latestValueField.dataType.asInstanceOf[StructType]
+
+        // Should have value4 field from RenamedFields
+        assert(latestValueType.fields.exists(f => f.name == "value4"),
+          "Expected renamed schema with value4 field")
+      }
+    }
+  }
+
   test("transformWithState - adding field should succeed") {
     withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
       classOf[RocksDBStateStoreProvider].getName,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index bda26da88679..ec17adde2bf5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.functions.timestamp_seconds
 import org.apache.spark.sql.internal.SQLConf
@@ -1830,6 +1831,21 @@ abstract class TransformWithStateSuite extends 
StateStoreMetricsTest
           CheckNewAnswer(("a", "1")),
           StopStream
         )
+
+        // Here we are writing non-metadata files to the operator metadata 
directory to ensure that
+        // they are ignored during restart.
+        val hadoopConf = spark.sessionState.newHadoopConf()
+        val fm = CheckpointFileManager.create(new Path(checkpointDir.toString),
+          hadoopConf)
+        fm.mkdirs(new Path(new Path(checkpointDir.toString, DIR_NAME_OFFSETS),
+          "dummy_path_name"))
+        fm.mkdirs(
+          new Path(OperatorStateMetadataV2.metadataDirPath(
+            new Path(new Path(new Path(checkpointDir.toString), "state"), "0")
+          ),
+            "dummy_path_name")
+        )
+
         val result2 = inputData.toDS()
           .groupByKey(x => x)
           .transformWithState(new 
RunningCountStatefulProcessorWithProcTimeTimer(),
@@ -1886,17 +1902,17 @@ abstract class TransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
-  private def getFiles(path: Path): Array[FileStatus] = {
+  private[sql] def getFiles(path: Path): Array[FileStatus] = {
     val hadoopConf = spark.sessionState.newHadoopConf()
     val fileManager = CheckpointFileManager.create(path, hadoopConf)
     fileManager.list(path)
   }
 
-  private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
+  private[sql] def getStateSchemaPath(stateCheckpointPath: Path): Path = {
     new Path(stateCheckpointPath, "_stateSchema/default/")
   }
 
-  // TODO: [SPARK-50845] Re-enable tests after StateSchemaV3 threshold change
+  // TODO: [SPARK-50845] Re-enable tests after full-rewrite is enabled.
   ignore("transformWithState - verify that metadata and schema logs are 
purged") {
     withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
       classOf[RocksDBStateStoreProvider].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to