This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5d71afc7ea0f [SPARK-51682][SS] State Store Checkpoint V2 should handle offset log ahead of commit log correctly 5d71afc7ea0f is described below commit 5d71afc7ea0f7fbab9b0f6d85e589c3ac6bc8a78 Author: Siying Dong <dong...@gmail.com> AuthorDate: Wed Apr 2 14:18:50 2025 +0900 [SPARK-51682][SS] State Store Checkpoint V2 should handle offset log ahead of commit log correctly ### What changes were proposed in this pull request? When State Store Checkpoint format V2 is used, we always read back checkpoint ID from commit log, rather than when commit log matches offset log. ### Why are the changes needed? Right now, there is a bug of reading checkpoint ID from commit log when the query restarts. If the offset log is ahead of commit log, it doesn't read it, and the tasks won't have checkpoint ID to recover from and the query will fail. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add a unit test that will fail without the fix. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50480 from siying/checkpoint_v2_commit_read. Authored-by: Siying Dong <dong...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/MicroBatchExecution.scala | 6 +-- .../RocksDBStateStoreCheckpointFormatV2Suite.scala | 59 +++++++++++++++++++++- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index fe06cbb19c3a..c977a499edc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -501,6 +501,9 @@ class MicroBatchExecution( * i.e., committedBatchId + 1 */ commitLog.getLatest() match { case Some((latestCommittedBatchId, commitMetadata)) => + commitMetadata.stateUniqueIds.foreach { + stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds + } if (latestBatchId == latestCommittedBatchId) { /* The last batch was successfully committed, so we can safely process a * new next batch but first: @@ -520,9 +523,6 @@ class MicroBatchExecution( execCtx.startOffsets ++= execCtx.endOffsets watermarkTracker.setWatermark( math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs)) - commitMetadata.stateUniqueIds.foreach { - stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds - } } else if (latestCommittedBatchId == latestBatchId - 1) { execCtx.endOffsets.foreach { case (source: Source, end: Offset) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala index ffbeaead9512..22150ffde5db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala @@ -23,12 +23,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.Tag -import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.{SparkContext, SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, ForeachWriter} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream} import org.apache.spark.sql.execution.streaming.state.StateStoreTestsHelper -import org.apache.spark.sql.functions.count +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.OutputMode.Update @@ -836,6 +836,61 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends StreamTest } } + /** + * This test verifies when there are failures when executing the query. + * When restarts happens, the job should still be recovered to the last committed batch. + */ + testWithCheckpointInfoTracked(s"checkpointFormatVersion2 query failure and restart") { + withTempDir { checkpointDir => + + var forceTaskFailure = false + val failUDF = udf((value: Int) => { + if (forceTaskFailure) { + // This will fail all close() call to trigger query failures in execution phase. + throw new RuntimeException("Ingest task failure") + } + value + }) + + val inputData = MemoryStream[Int] + val aggregated = + inputData + .toDF() + .select(failUDF($"value").as("value")) + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 3), + CheckLastBatch((3, 1)), + Execute { _ => + forceTaskFailure = true + }, + AddData(inputData, 3, 2), + ExpectFailure[SparkException] { ex => + ex.getCause.getMessage.contains("FAILED_EXECUTE_UDF") + } + ) + + forceTaskFailure = false + + // Test recovery + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputData, 3, 2, 1), + CheckLastBatch((3, 3), (2, 2), (1, 1)), + // By default we run in new tuple mode. + AddData(inputData, 4, 4, 4, 4), + CheckLastBatch((4, 4)), + AddData(inputData, 5, 5), + CheckLastBatch((5, 2)), + StopStream + ) + } + } + testWithCheckpointInfoTracked(s"checkpointFormatVersion2 validate ID") { withTempDir { checkpointDir => val inputData = MemoryStream[Int] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org