This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 23785d33acab [SPARK-51904][SS] Removing async metadata purging for StateSchemaV3 and ignoring non-batch files when listing OperatorMetadata files 23785d33acab is described below commit 23785d33acab4a7a9a0825d874b4f72e4aa19374 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Sat Apr 26 09:08:55 2025 +0900 [SPARK-51904][SS] Removing async metadata purging for StateSchemaV3 and ignoring non-batch files when listing OperatorMetadata files ### What changes were proposed in this pull request? Currently, we don't want to purge StateSchemaV3 files, so we need to remove the relevant call from MicrobatchExecution. Additionally, we want to ignore any files in the metadata or state schema directory that don't have a Long (which would cause a parse exception) ### Why are the changes needed? The changes are needed because we cannot purge schema files because these are necessary until full rewrite is implemented. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50700 from ericm-db/remove-async-purge. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/state/OperatorStateMetadata.scala | 29 ++- .../streaming/TransformWithStateAvroSuite.scala | 194 ++++++++++++++++++++- .../sql/streaming/TransformWithStateSuite.scala | 22 ++- 3 files changed, 234 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala index e60927742c63..befa3fb81722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala @@ -332,9 +332,12 @@ class OperatorStateMetadataV2Reader( if (!fm.exists(offsetLog)) { return Array.empty } + // Offset Log files are numeric so we want to skip any files that don't + // conform to this fm.list(offsetLog) .filter(f => !f.getPath.getName.startsWith(".")) // ignore hidden files - .map(_.getPath.getName.toLong).sorted + .flatMap(f => scala.util.Try(f.getPath.getName.toLong).toOption) + .sorted } // List the available batches in the operator metadata directory @@ -342,7 +345,11 @@ class OperatorStateMetadataV2Reader( if (!fm.exists(metadataDirPath)) { return Array.empty } - fm.list(metadataDirPath).map(_.getPath.getName.toLong).sorted + + // filter out non-numeric file names (as OperatorStateMetadataV2 file names are numeric) + fm.list(metadataDirPath) + .flatMap(f => scala.util.Try(f.getPath.getName.toLong).toOption) + .sorted } override def read(): Option[OperatorStateMetadata] = { @@ -407,6 +414,8 @@ class OperatorStateMetadataV2FileManager( if (thresholdBatchId != 0) { val earliestBatchIdKept = deleteMetadataFiles(thresholdBatchId) // we need to delete everything from 0 to (earliestBatchIdKept - 1), inclusive + // TODO: [SPARK-50845]: Currently, deleteSchemaFiles is a no-op since earliestBatchIdKept + // is always 0, and the earliest schema file to 'keep' is -1. deleteSchemaFiles(earliestBatchIdKept - 1) } } @@ -418,11 +427,19 @@ class OperatorStateMetadataV2FileManager( commitLog.listBatchesOnDisk.headOption.getOrElse(0L) } + // TODO: [SPARK-50845]: Currently, deleteSchemaFiles is a no-op since thresholdBatchId + // is always -1 private def deleteSchemaFiles(thresholdBatchId: Long): Unit = { + if (thresholdBatchId <= 0) { + return + } + // StateSchemaV3 filenames are of the format {batchId}_{UUID} + // so we want to filter for files that do not have this format val schemaFiles = fm.list(stateSchemaPath).sorted.map(_.getPath) val filesBeforeThreshold = schemaFiles.filter { path => - val batchIdInPath = path.getName.split("_").head.toLong - batchIdInPath <= thresholdBatchId + scala.util.Try(path.getName.split("_").head.toLong) + .toOption + .exists(_ <= thresholdBatchId) } filesBeforeThreshold.foreach { path => fm.delete(path) @@ -460,8 +477,8 @@ class OperatorStateMetadataV2FileManager( } } - // TODO: Implement state schema file purging logic once we have - // enabled full-rewrite. + // TODO: [SPARK-50845]: Return earliest schema file we need after implementing + // full-rewrite 0 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala index bddcece85486..ce0f2113eac5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala @@ -17,16 +17,22 @@ package org.apache.spark.sql.streaming +import org.apache.hadoop.fs.Path import org.scalactic.source.Position import org.scalatest.Tag +import org.scalatest.matchers.must.Matchers.be +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.time.{Seconds, Span} import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions -import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStoreInvalidValueSchemaEvolution, StateStoreValueSchemaEvolutionThresholdExceeded} +import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MemoryStream, MicroBatchExecution} +import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS +import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV2, RocksDBStateStoreProvider, StateStoreInvalidValueSchemaEvolution, StateStoreValueSchemaEvolutionThresholdExceeded} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.types.StructType class TransformWithStateAvroSuite extends TransformWithStateSuite { @@ -264,6 +270,190 @@ class TransformWithStateAvroSuite extends TransformWithStateSuite { } } + test("transformWithState - verify schema files are retained through multiple evolutions") { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1", + SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") { + withTempDir { chkptDir => + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) + + // Start with initial basic state schema + val inputData = MemoryStream[String] + val result1 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new DefaultValueInitialProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "a"), + CheckNewAnswer(("a", BasicState("a".hashCode, "a"))), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(new Path(chkptDir.toString), + hadoopConf) + fm.mkdirs(new Path(new Path(chkptDir.toString, DIR_NAME_OFFSETS), + "dummy_path_name")) + fm.mkdirs( + new Path(OperatorStateMetadataV2.metadataDirPath( + new Path(new Path(new Path(chkptDir.toString), "state"), "0") + ), + "dummy_path_name") + ) + val dummySchemaPath = + new Path(stateSchemaPath, "__dummy_file_path") + fm.mkdirs(dummySchemaPath) + + + // Capture initial schema files (after first schema evolution) + val initialSchemaFiles = getFiles(stateSchemaPath).length + assert(initialSchemaFiles > 0, "Expected schema files after initial run") + + // Second run with evolved state (adding fields) + val result2 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new DefaultValueEvolvedProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "b"), + CheckNewAnswer(("b", EvolvedState("b".hashCode, "b", 100L, true, 99.9))), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + // Capture schema files after second evolution + val afterAddingFieldsSchemaFiles = getFiles(stateSchemaPath).length + assert(afterAddingFieldsSchemaFiles > initialSchemaFiles, + s"Expected more schema files after adding fields," + + s" but had $initialSchemaFiles before and $afterAddingFieldsSchemaFiles after") + + // Third run with TwoLongs schema + val result3 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessorTwoLongs(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result3, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "c"), + CheckNewAnswer(("c", "1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + // Capture schema files after third evolution + val afterTwoLongsSchemaFiles = getFiles(stateSchemaPath).length + assert(afterTwoLongsSchemaFiles > afterAddingFieldsSchemaFiles, + "Expected more schema files after TwoLongs schema change") + + // Fourth run with ReorderedLongs schema + val result4 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessorReorderedFields(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result4, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "d"), + CheckNewAnswer(("d", "1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + // Capture schema files after fourth evolution + val afterReorderedSchemaFiles = getFiles(stateSchemaPath).length + assert(afterReorderedSchemaFiles > afterTwoLongsSchemaFiles, + "Expected more schema files after ReorderedLongs schema change") + + // Fifth run with RenamedFields schema + val result5 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RenameEvolvedProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result5, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "e"), + CheckNewAnswer(("e", "1")), + // Run multiple batches to trigger maintenance + AddData(inputData, "f"), + CheckNewAnswer(("f", "1")), + AddData(inputData, "g"), + CheckNewAnswer(("g", "1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + // Verify metadata files were purged with MIN_BATCHES_TO_RETAIN=1 + val finalMetadataFiles = getFiles(metadataPath).length + // We expect the dummy folder and 2 metadata files + assert(finalMetadataFiles <= 3, + s"Expected metadata files to be purged to at most 3, but found $finalMetadataFiles") + + // Verify schema files were NOT purged despite aggressive metadata purging + val schemaFiles = getFiles(stateSchemaPath).map(_.getPath.getName) + val finalSchemaFiles = schemaFiles.length + assert(finalSchemaFiles >= 5, + s"Expected at least 5 schema files to be retained" + + s" (one per schema evolution), but found $finalSchemaFiles") + assert(schemaFiles.contains(dummySchemaPath.getName)) + + // Verify we can read historical state for different batches + // This should work even though metadata may have been purged for earlier batches + val latestStateDf = spark.read + .format("statestore") + .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath) + .option(StateSourceOptions.STATE_VAR_NAME, "countState") + .load() + + assert(latestStateDf.count() > 0, "Expected to read current state data") + + // Check schema of latest state - should have RenamedFields schema structure + val latestValueField = latestStateDf.schema.fields.find(_.name == "value").get + val latestValueType = latestValueField.dataType.asInstanceOf[StructType] + + // Should have value4 field from RenamedFields + assert(latestValueType.fields.exists(f => f.name == "value4"), + "Expected renamed schema with value4 field") + } + } + } + test("transformWithState - adding field should succeed") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index bda26da88679..ec17adde2bf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf @@ -1830,6 +1831,21 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest CheckNewAnswer(("a", "1")), StopStream ) + + // Here we are writing non-metadata files to the operator metadata directory to ensure that + // they are ignored during restart. + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(new Path(checkpointDir.toString), + hadoopConf) + fm.mkdirs(new Path(new Path(checkpointDir.toString, DIR_NAME_OFFSETS), + "dummy_path_name")) + fm.mkdirs( + new Path(OperatorStateMetadataV2.metadataDirPath( + new Path(new Path(new Path(checkpointDir.toString), "state"), "0") + ), + "dummy_path_name") + ) + val result2 = inputData.toDS() .groupByKey(x => x) .transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(), @@ -1886,17 +1902,17 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest } } - private def getFiles(path: Path): Array[FileStatus] = { + private[sql] def getFiles(path: Path): Array[FileStatus] = { val hadoopConf = spark.sessionState.newHadoopConf() val fileManager = CheckpointFileManager.create(path, hadoopConf) fileManager.list(path) } - private def getStateSchemaPath(stateCheckpointPath: Path): Path = { + private[sql] def getStateSchemaPath(stateCheckpointPath: Path): Path = { new Path(stateCheckpointPath, "_stateSchema/default/") } - // TODO: [SPARK-50845] Re-enable tests after StateSchemaV3 threshold change + // TODO: [SPARK-50845] Re-enable tests after full-rewrite is enabled. ignore("transformWithState - verify that metadata and schema logs are purged") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org