This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c7024d35c45a [SPARK-51714][SS] Add Failure Ingestion test to test 
state store checkpoint format V2
c7024d35c45a is described below

commit c7024d35c45a6e4f456f2a3ce5811a59586377cd
Author: Siying Dong <dong...@gmail.com>
AuthorDate: Wed Apr 9 13:50:24 2025 +0900

    [SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint 
format V2
    
    ### Why are the changes needed?
    The new state store checkpoint format needs failure tolerance tests to make 
sure the implementation is correct and delivers the behavior we would like.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    It is test code itself
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50508 from siying/ingest_failure9.
    
    Lead-authored-by: Siying Dong <dong...@gmail.com>
    Co-authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    |  18 +-
 .../streaming/state/RocksDBFileManager.scala       |   8 +-
 .../state/RocksDBStateStoreProvider.scala          |  22 +-
 .../sql/execution/streaming/state/StateStore.scala |   8 +
 .../FailureInjectionCheckpointFileManager.scala    | 312 ++++++++++++
 .../RocksDBCheckpointFailureInjectionSuite.scala   | 535 +++++++++++++++++++++
 6 files changed, 899 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index befb1dbdc23b..15df2fae8260 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -135,7 +135,23 @@ class RocksDB(
   private val nativeStats = rocksDbOptions.statistics()
 
   private val workingDir = createTempDir("workingDir")
-  private val fileManager = new RocksDBFileManager(dfsRootDir, 
createTempDir("fileManager"),
+
+  protected def createFileManager(
+      dfsRootDir: String,
+      localTempDir: File,
+      hadoopConf: Configuration,
+      codecName: String,
+      loggingId: String): RocksDBFileManager = {
+    new RocksDBFileManager(
+      dfsRootDir,
+      localTempDir,
+      hadoopConf,
+      codecName,
+      loggingId = loggingId
+    )
+  }
+
+  private val fileManager = createFileManager(dfsRootDir, 
createTempDir("fileManager"),
     hadoopConf, conf.compressionCodec, loggingId = loggingId)
   private val byteArrayPair = new ByteArrayPair()
   private val commitLatencyMs = new mutable.HashMap[String, Long]()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index a5cd2218986c..562a57aafbd4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -32,7 +32,7 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
 import org.apache.commons.io.{FilenameUtils, IOUtils}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
@@ -133,8 +133,12 @@ class RocksDBFileManager(
 
   import RocksDBImmutableFile._
 
+  protected def getFileSystem(myDfsRootDir: String, myHadoopConf: 
Configuration) : FileSystem = {
+    new Path(myDfsRootDir).getFileSystem(myHadoopConf)
+  }
+
   private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), 
hadoopConf)
-  private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
+  private val fs = getFileSystem(dfsRootDir, hadoopConf)
   private val onlyZipFiles = new PathFilter {
     override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 15794cada675..601caaa34290 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -524,13 +524,33 @@ private[sql] class RocksDBStateStoreProvider
   @volatile private var stateStoreEncoding: String = _
   @volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
 
+  protected def createRocksDB(
+      dfsRootDir: String,
+      conf: RocksDBConf,
+      localRootDir: File,
+      hadoopConf: Configuration,
+      loggingId: String,
+      useColumnFamilies: Boolean,
+      enableStateStoreCheckpointIds: Boolean,
+      partitionId: Int = 0): RocksDB = {
+    new RocksDB(
+      dfsRootDir,
+      conf,
+      localRootDir,
+      hadoopConf,
+      loggingId,
+      useColumnFamilies,
+      enableStateStoreCheckpointIds,
+      partitionId)
+  }
+
   private[sql] lazy val rocksDB = {
     val dfsRootDir = stateStoreId.storeCheckpointLocation().toString
     val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
       s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
     val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
     val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), 
storeIdStr)
-    new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, 
storeIdStr,
+    createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, 
hadoopConf, storeIdStr,
       useColumnFamilies, storeConf.enableStateStoreCheckpointIds, 
stateStoreId.partitionId)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 33a21c79f3db..ccb925287e77 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -988,6 +988,14 @@ object StateStore extends Logging {
 
   /** Stop maintenance thread and reset the maintenance task */
   def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
+    stopMaintenanceTaskWithoutLock()
+  }
+
+  /**
+   * Only used for unit tests. The function doesn't hold loadedProviders lock. 
Calling
+   * it can work-around a deadlock condition where a maintenance task is 
waiting for the lock
+   * */
+  private[streaming] def stopMaintenanceTaskWithoutLock(): Unit = {
     if (maintenanceThreadPool != null) {
       maintenanceThreadPoolLock.synchronized {
         maintenancePartitions.clear()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
new file mode 100644
index 000000000000..4711a45804fb
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io._
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.util.Progressable
+
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream,
 RenameBasedFSDataOutputStream}
+import 
org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager
+
+/**
+ * A wrapper file output stream that will throw exception in close() and put 
the underlying
+ * stream to FailureInjectionFileSystem.delayedStreams
+ * @param stream stream to be wrapped
+ */
+class DelayCloseFSDataOutputStreamWrapper(
+    stream: CancellableFSDataOutputStream,
+    injectionState: FailureInjectionState)
+  extends CancellableFSDataOutputStream(stream.getWrappedStream) with Logging {
+  val originalStream: CancellableFSDataOutputStream = stream
+
+  var closed: Boolean = false
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      injectionState.delayedStreams = injectionState.delayedStreams :+ 
originalStream
+      throw new IOException("Fake File Stream Close Failure")
+    }
+  }
+
+  /** Cancel is not needed in unit tests */
+  override def cancel(): Unit = {}
+}
+
+/**
+ * A wrapper checkpoint file manager that might inject failures in some 
function calls.
+ * Used in unit tests to simulate failure scenarios.
+ * This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to 
provide failure
+ * injection behavior.
+ *
+ * @param path The path to the checkpoint directory, passing to the parent 
class
+ * @param hadoopConf  hadoop conf that will be passed to the parent class
+ */
+class FailureInjectionCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
+  extends FileSystemBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+  // Injection state for the path
+  private val injectionState = 
FailureInjectionFileSystem.getInjectionState(path.toString)
+
+  override def createAtomic(path: Path,
+                            overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream = {
+    injectionState.failureCreateAtomicRegex.foreach { pattern =>
+      if (path.toString.matches(pattern)) {
+        throw new IOException("Fake File System Create Atomic Failure")
+      }
+    }
+
+    var shouldDelay = false
+    injectionState.createAtomicDelayCloseRegex.foreach { pattern =>
+      if (path.toString.matches(pattern)) {
+        shouldDelay = true
+      }
+    }
+    val ret = new RenameBasedFSDataOutputStream(this, path, 
overwriteIfPossible)
+    if (shouldDelay) {
+      new DelayCloseFSDataOutputStreamWrapper(ret, injectionState)
+    } else {
+      ret
+    }
+  }
+
+  override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
+    if (injectionState.allowOverwriteInRename || !fs.exists(dstPath)) {
+      super.renameTempFile(srcPath, dstPath, overwriteIfPossible)
+    } else {
+      logWarning(s"Skip renaming temp file $srcPath to $dstPath because it 
already exists.")
+    }
+  }
+
+  override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+    super.list(path, filter)
+  }
+
+  override def exists(path: Path): Boolean = {
+    if (injectionState.shouldFailExist) {
+      throw new IOException("Fake File Exists Failure")
+    }
+    super.exists(path)
+  }
+}
+
+/**
+ * A class that contains the failure injection state for a path.
+ * Variables in this class cannot be updated concurrently by two threads, but 
can have multiple
+ * readers
+ */
+class FailureInjectionState {
+  // File names matching this regex will cause the copyFromLocalFile to fail
+  @volatile
+  var failPreCopyFromLocalFileNameRegex: Seq[String] = Seq.empty
+  // File names matching this regex will cause the close() to fail and put the 
streams in
+  // `delayedStreams`
+  @volatile
+  var createAtomicDelayCloseRegex: Seq[String] = Seq.empty
+  // File names matching this regex will cause the createAtomic() to fail
+  @volatile
+  var failureCreateAtomicRegex: Seq[String] = Seq.empty
+  // If true, Exists() call will fail
+  @volatile
+  var shouldFailExist: Boolean = false
+  // If true, simulate a case where rename() will not overwrite an existing 
file.
+  @volatile
+  var allowOverwriteInRename: Boolean = true
+
+  // List of streams that are delayed in close() based on 
`createAtomicDelayCloseRegex`
+  @volatile
+  var delayedStreams: Seq[CancellableFSDataOutputStream] = Seq.empty
+}
+
+/**
+ * Contains a list of variables for failure ingestion conditions.
+ * These are singleton instances accessed by all instances of 
FailureInjectionCheckpointFileManager
+ * and FailureInjectionFileSystem. This allows a unit test to have a global 
control of failure
+ * and access to the delayed streams.
+ */
+object FailureInjectionFileSystem {
+  // A map from a temp path to its failure injection state.
+  var tempPathToInjectionState: Map[String, FailureInjectionState] = Map.empty
+
+  /**
+   * Create a new FailureInjectionState for a temp path and add it to the map.
+   * @param path  the temp path
+   * @return  the newly created failure injection state
+   */
+  def addPathToTempToInjectionState(path: String): FailureInjectionState = 
synchronized {
+    // Throw exception if the path already exists in the map
+    assert(!tempPathToInjectionState.contains(path), s"Path $path already 
exists in the map")
+    tempPathToInjectionState = tempPathToInjectionState + (path -> new 
FailureInjectionState)
+    tempPathToInjectionState(path)
+  }
+
+  /**
+   * Clean up a temp path and its failure injection state
+   * @param path the temp path to be cle
+   */
+  def removePathFromTempToInjectionState(path: String): Unit = synchronized {
+    tempPathToInjectionState = tempPathToInjectionState - path
+  }
+
+  /**
+   * find injection state based on temp dir as prefix
+   * @param path a path with temp dir as prefix
+   * @return the injection state if the path is in the map. Exception if there 
is no match for
+   *         prefix
+   */
+  def getInjectionState(path: String): FailureInjectionState = synchronized {
+    // remove "file://" prefix from path
+    val cleanedPath = path.replace("file:/", "/")
+    // return the injection state if the path in the map is prefix of path
+    val pathPrefix = tempPathToInjectionState.keys.find(cleanedPath.startsWith)
+    assert(pathPrefix.isDefined)
+    tempPathToInjectionState.get(pathPrefix.get).get
+  }
+}
+
+/**
+ * A wrapper FileSystem that inject some failures. This class can used to 
replace the
+ * FileSystem in RocksDBFileManager.
+ * @param innerFs  the FileSystem to be wrapped
+ */
+class FailureInjectionFileSystem(innerFs: FileSystem) extends FileSystem {
+
+  override def getConf: Configuration = innerFs.getConf
+
+  override def mkdirs(f: Path, permission: FsPermission): Boolean = 
innerFs.mkdirs(f, permission)
+
+  override def rename(src: Path, dst: Path): Boolean = innerFs.rename(src, dst)
+
+  override def getUri: URI = innerFs.getUri
+
+  override def open(f: Path, bufferSize: Int): FSDataInputStream = 
innerFs.open(f, bufferSize)
+
+  override def create(
+      f: Path,
+      permission: FsPermission,
+      overwrite: Boolean,
+      bufferSize: Int,
+      replication: Short,
+      blockSize: Long,
+      progress: Progressable): FSDataOutputStream =
+    innerFs.create(f, permission, overwrite, bufferSize, replication, 
blockSize, progress)
+
+  override def append(f: Path, bufferSize: Int, progress: Progressable): 
FSDataOutputStream =
+    innerFs.append(f, bufferSize, progress)
+
+  override def delete(f: Path, recursive: Boolean): Boolean = 
innerFs.delete(f, recursive)
+
+  override def listStatus(f: Path): Array[FileStatus] = innerFs.listStatus(f)
+
+  override def setWorkingDirectory(new_dir: Path): Unit = 
innerFs.setWorkingDirectory(new_dir)
+
+  override def getWorkingDirectory: Path = innerFs.getWorkingDirectory
+
+  override def getFileStatus(f: Path): FileStatus = innerFs.getFileStatus(f)
+
+  override def copyFromLocalFile(src: Path, dst: Path): Unit = {
+    // Find injection state based on the destination path
+    val injectionState = 
FailureInjectionFileSystem.getInjectionState(dst.toString)
+
+    injectionState.failPreCopyFromLocalFileNameRegex.foreach { pattern =>
+      if (src.toString.matches(pattern)) {
+        throw new IOException(s"Injected failure due to source path matching 
pattern: $pattern")
+      }
+    }
+
+    innerFs.copyFromLocalFile(src, dst)
+  }
+}
+
+/**
+ * A wrapper RocksDB State Store Provider that replaces FileSystem used in 
RocksDBFileManager
+ * to FailureInjectionFileSystem.
+ */
+class FailureInjectionRocksDBStateStoreProvider extends 
RocksDBStateStoreProvider {
+  override def createRocksDB(
+      dfsRootDir: String,
+      conf: RocksDBConf,
+      localRootDir: File,
+      hadoopConf: Configuration,
+      loggingId: String,
+      useColumnFamilies: Boolean,
+      enableStateStoreCheckpointIds: Boolean,
+      partitionId: Int): RocksDB = {
+    FailureInjectionRocksDBStateStoreProvider.createRocksDBWithFaultInjection(
+      dfsRootDir,
+      conf,
+      localRootDir,
+      hadoopConf,
+      loggingId,
+      useColumnFamilies,
+      enableStateStoreCheckpointIds,
+      partitionId)
+  }
+}
+
+object FailureInjectionRocksDBStateStoreProvider {
+  /**
+   * RocksDBFieManager is created by RocksDB class where it creates a default 
FileSystem.
+   * We make RocksDB create a RocksDBFileManager that uses a different 
FileSystem here.
+   * */
+  def createRocksDBWithFaultInjection(
+      dfsRootDir: String,
+      conf: RocksDBConf,
+      localRootDir: File,
+      hadoopConf: Configuration,
+      loggingId: String,
+      useColumnFamilies: Boolean,
+      enableStateStoreCheckpointIds: Boolean,
+      partitionId: Int): RocksDB = {
+    new RocksDB(
+      dfsRootDir,
+      conf = conf,
+      localRootDir = localRootDir,
+      hadoopConf = hadoopConf,
+      loggingId = loggingId,
+      useColumnFamilies = useColumnFamilies,
+      enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+      partitionId = partitionId
+    ) {
+      override def createFileManager(
+          dfsRootDir: String,
+          localTempDir: File,
+          hadoopConf: Configuration,
+          codecName: String,
+          loggingId: String): RocksDBFileManager = {
+        new RocksDBFileManager(
+          dfsRootDir,
+          localTempDir,
+          hadoopConf,
+          codecName,
+          loggingId = loggingId) {
+          override def getFileSystem(
+              myDfsRootDir: String,
+              myHadoopConf: Configuration): FileSystem = {
+            new FailureInjectionFileSystem(new 
Path(myDfsRootDir).getFileSystem(myHadoopConf))
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
new file mode 100644
index 000000000000..31fc51c4d56f
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io._
+
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.count
+import org.apache.spark.sql.internal.SQLConf
+import 
org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.OutputMode.Update
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.SlowSQLTest
+import org.apache.spark.util.Utils
+
+
+@SlowSQLTest
+/** Test suite to inject some failures in RocksDB checkpoint */
+class RocksDBCheckpointFailureInjectionSuite extends StreamTest
+  with SharedSparkSession {
+
+  private val fileManagerClassName = 
classOf[FailureInjectionCheckpointFileManager].getName
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(SQLConf.STATE_STORE_PROVIDER_CLASS, 
classOf[RocksDBStateStoreProvider].getName)
+
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+  }
+
+  /**
+   * create a temp dir and register it to failure injection file system and 
remove it in the end
+   * The function can be moved to StreamTest if it is used by other tests too.
+   * @param f the function to run with the temp dir and the injection state
+   */
+  def withTempDirAllowFailureInjection(f: (File, FailureInjectionState) => 
Unit): Unit = {
+    withTempDir { dir =>
+      val injectionState = 
FailureInjectionFileSystem.addPathToTempToInjectionState(dir.getPath)
+      try {
+        f(dir, injectionState)
+      } finally {
+        
FailureInjectionFileSystem.removePathFromTempToInjectionState(dir.getPath)
+      }
+    }
+  }
+
+  implicit def toArray(str: String): Array[Byte] = if (str != null) 
str.getBytes else null
+
+  case class FailureConf(ifEnableStateStoreCheckpointIds: Boolean, fileType: 
String) {
+    override def toString: String = {
+      s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds, " +
+        s"fileType = $fileType"
+    }
+  }
+
+  Seq(
+    FailureConf(ifEnableStateStoreCheckpointIds = false, "zip"),
+    FailureConf(ifEnableStateStoreCheckpointIds = false, "sst"),
+    FailureConf(ifEnableStateStoreCheckpointIds = true, "zip"),
+    FailureConf(ifEnableStateStoreCheckpointIds = true, "sst")).foreach { 
testConf =>
+    test(s"Basic RocksDB SST File Upload Failure Handling $testConf") {
+      val hadoopConf = new Configuration()
+      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fileManagerClassName)
+      withTempDirAllowFailureInjection { (remoteDir, injectionState) =>
+        withSQLConf(
+          RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "false") {
+          val conf = RocksDBConf(StateStoreConf(SQLConf.get))
+          withDB(
+            remoteDir.getAbsolutePath,
+            version = 0,
+            conf = conf,
+            hadoopConf = hadoopConf,
+            enableStateStoreCheckpointIds = 
testConf.ifEnableStateStoreCheckpointIds) { db =>
+            db.put("version", "1.1")
+            val checkpointId1 = commitAndGetCheckpointId(db)
+
+            if (testConf.fileType == "sst") {
+              injectionState.failPreCopyFromLocalFileNameRegex = Seq(".*sst")
+            } else {
+              assert(testConf.fileType == "zip")
+              injectionState.failureCreateAtomicRegex = Seq(".*zip")
+            }
+            db.put("version", "2.1")
+            var checkpointId2: Option[String] = None
+            intercept[IOException] {
+              checkpointId2 = commitAndGetCheckpointId(db)
+            }
+
+            db.load(1, checkpointId1)
+
+            injectionState.failPreCopyFromLocalFileNameRegex = Seq.empty
+            injectionState.failureCreateAtomicRegex = Seq.empty
+            // When ifEnableStateStoreCheckpointIds is true, checkpointId is 
not available
+            // to load version 2. If we use None, it will throw a Runtime 
error. We probably
+            // should categorize this error.
+            // TODO test ifEnableStateStoreCheckpointIds = true case after it 
is fixed.
+            if (!testConf.ifEnableStateStoreCheckpointIds) {
+              // Make sure that the checkpoint can't be loaded as some files 
aren't uploaded
+              // correctly.
+              val ex = intercept[SparkException] {
+                db.load(2, checkpointId2)
+              }
+              checkError(
+                ex,
+                condition = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+                parameters = Map(
+                  "fileToRead" -> s"$remoteDir/2.changelog"
+                )
+              )
+            }
+
+            db.load(0)
+            injectionState.shouldFailExist = true
+            intercept[IOException] {
+              db.load(1, checkpointId1)
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This test is to simulate the case where a previous task had connectivity 
problem that couldn't
+   * be killed or write zip file. Only after the later one is successfully 
committed, it comes back
+   * and write the zip file.
+   */
+  Seq(true, false).foreach { ifEnableStateStoreCheckpointIds =>
+    test(
+      "Zip File Overwritten by Previous Task Checkpoint " +
+      s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds") {
+      val hadoopConf = new Configuration()
+      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fileManagerClassName)
+      withTempDirAllowFailureInjection { (remoteDir, injectionState) =>
+        withSQLConf(
+          RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "false") {
+          val conf = RocksDBConf(StateStoreConf(SQLConf.get))
+
+          var checkpointId2: Option[String] = None
+          withDB(
+            remoteDir.getAbsolutePath,
+            version = 0,
+            conf = conf,
+            hadoopConf = hadoopConf,
+            enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds) { 
db =>
+            db.put("version", "1.1")
+            val checkpointId1 = commitAndGetCheckpointId(db)
+
+            db.load(1, checkpointId1)
+            injectionState.createAtomicDelayCloseRegex = Seq(".*zip")
+            db.put("version", "2.1")
+
+            intercept[IOException] {
+              commitAndGetCheckpointId(db)
+            }
+
+            injectionState.createAtomicDelayCloseRegex = Seq.empty
+
+            db.load(1, checkpointId1)
+
+            db.put("version", "2.2")
+            checkpointId2 = commitAndGetCheckpointId(db)
+
+            assert(injectionState.delayedStreams.nonEmpty)
+            injectionState.delayedStreams.foreach(_.close())
+          }
+          withDB(
+            remoteDir.getAbsolutePath,
+            version = 2,
+            conf = conf,
+            hadoopConf = hadoopConf,
+            enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds,
+            checkpointId = checkpointId2) { db =>
+            if (ifEnableStateStoreCheckpointIds) {
+              // If checkpointV2 is used, writing a checkpoint file from a 
previously failed
+              // batch should be ignored.
+              assert(new String(db.get("version"), "UTF-8") == "2.2")
+            } else {
+              // Assuming previous 2.zip overwrites, we should see the 
previous value.
+              // This validation isn't necessary here but we just would like 
to make sure
+              // FailureInjectionCheckpointFileManager has correct behavior -- 
 allows zip files
+              // to be delayed to be written, so that the test for
+              // ifEnableStateStoreCheckpointIds = true is valid.
+              assert(new String(db.get("version"), "UTF-8") == "2.1")
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This test is to simulate the case where a previous task had connectivity 
problem that couldn't
+   * be killed or write changelog file. Only after the later one is 
successfully committed, it comes
+   * back and write the changelog file.
+   *  */
+  Seq(false, true).foreach { ifEnableStateStoreCheckpointIds =>
+    test(
+      "Changelog File Overwritten by Previous Task With Changelog Checkpoint " 
+
+      s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds") {
+      val hadoopConf = new Configuration()
+      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fileManagerClassName)
+      withTempDirAllowFailureInjection { (remoteDir, injectionState) =>
+        withSQLConf(
+          RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+          SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "5") {
+          val conf = RocksDBConf(StateStoreConf(SQLConf.get))
+
+          var checkpointId2: Option[String] = None
+          withDB(
+            remoteDir.getAbsolutePath,
+            version = 0,
+            conf = conf,
+            hadoopConf = hadoopConf,
+            enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds) { 
db =>
+            db.put("version", "1.1")
+            val checkpointId1 = commitAndGetCheckpointId(db)
+
+            injectionState.createAtomicDelayCloseRegex = Seq(".*/2.*changelog")
+
+            db.load(1, checkpointId1)
+            db.put("version", "2.1")
+            intercept[IOException] {
+              commitAndGetCheckpointId(db)
+            }
+
+            injectionState.createAtomicDelayCloseRegex = Seq.empty
+
+            db.load(1, checkpointId1)
+
+            db.put("version", "2.2")
+            checkpointId2 = commitAndGetCheckpointId(db)
+
+            assert(injectionState.delayedStreams.nonEmpty)
+            injectionState.delayedStreams.foreach(_.close())
+
+            db.load(1, checkpointId1)
+            db.load(2, checkpointId2)
+          }
+
+          withDB(
+            remoteDir.getAbsolutePath,
+            version = 2,
+            conf = conf,
+            hadoopConf = hadoopConf,
+            enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds,
+            checkpointId = checkpointId2) { db =>
+            if (ifEnableStateStoreCheckpointIds) {
+              // If checkpointV2 is used, writing a checkpoint file from a 
previously failed
+              // batch should be ignored.
+              assert(new String(db.get("version"), "UTF-8") == "2.2")
+            } else {
+              // This check is not necessary. But we would like to validate 
the behavior of
+              // FailureInjectionFileSystem to ensure the 
ifEnableStateStoreCheckpointIds = true
+              // case is valid.
+              assert(new String(db.get("version"), "UTF-8") == "2.1")
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This test is to simulate the case where
+   * 1. There is a snapshot checkpoint scheduled
+   * 2. The batch eventually failed
+   * 3. Query is retried and moved forward
+   * 4. The snapshot checkpoint succeeded
+   * In checkpoint V2, this snapshot shouldn't take effect. Otherwise, it will 
break the strong
+   * consistency guaranteed by V2.
+   */
+  test("Delay Snapshot V2") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fileManagerClassName)
+    withTempDirAllowFailureInjection { (remoteDir, _) =>
+      withSQLConf(
+        RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+        SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") {
+        val conf = RocksDBConf(StateStoreConf(SQLConf.get))
+        var checkpointId3: Option[String] = None
+        withDB(
+          remoteDir.getAbsolutePath,
+          version = 0,
+          conf = conf,
+          hadoopConf = hadoopConf,
+          enableStateStoreCheckpointIds = true) { db =>
+          db.put("version", "1.1")
+          val checkpointId1 = commitAndGetCheckpointId(db)
+
+          // Creating another DB which will foce a snapshot checkpoint to an 
older version.
+          withDB(
+            remoteDir.getAbsolutePath,
+            version = 1,
+            conf = conf,
+            hadoopConf = hadoopConf,
+            enableStateStoreCheckpointIds = true,
+            checkpointId = checkpointId1) { db2 =>
+            db2.put("version", "2.1")
+            db2.commit()
+
+            db.load(1, checkpointId1)
+            db.put("version", "2.2")
+            val checkpointId2 = commitAndGetCheckpointId(db)
+
+            db.load(2, checkpointId2)
+            db.put("foo", "bar")
+            checkpointId3 = commitAndGetCheckpointId(db)
+
+            db2.doMaintenance()
+          }
+        }
+        withDB(
+          remoteDir.getAbsolutePath,
+          version = 3,
+          conf = conf,
+          hadoopConf = hadoopConf,
+          enableStateStoreCheckpointIds = true,
+          checkpointId = checkpointId3) { db =>
+          // Checkpointing V2 should ignore the snapshot checkpoint from the 
previous batch.
+          assert(new String(db.get("version"), "UTF-8") == "2.2")
+          assert(new String(db.get("foo"), "UTF-8") == "bar")
+        }
+      }
+    }
+  }
+
+  import testImplicits._
+
+  /**
+   * An integrated test where a previous changelog from a failed batch come 
back and finish
+   * writing. In checkpoint V2, this changelog should be ignored.
+   * Test it with both file renaming overwrite and not renaming overwrite.
+   */
+  Seq(false, true).foreach { ifAllowRenameOverwrite =>
+    test(s"Job failure with changelog shows up ifAllowRenameOverwrite = 
$ifAllowRenameOverwrite") {
+      val hadoopConf = new Configuration()
+      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fileManagerClassName)
+      val rocksdbChangelogCheckpointingConfKey =
+        RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled"
+
+      withTempDirAllowFailureInjection { (checkpointDir, injectionState) =>
+        injectionState.allowOverwriteInRename = ifAllowRenameOverwrite
+        withSQLConf(
+          rocksdbChangelogCheckpointingConfKey -> "true",
+          SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") {
+          val inputData = MemoryStream[Int]
+          val aggregated =
+            inputData.toDF()
+              .groupBy($"value")
+              .agg(count("*"))
+              .as[(Int, Long)]
+
+          injectionState.createAtomicDelayCloseRegex = Seq(".*/2_.*changelog")
+
+          testStream(aggregated, Update)(
+            StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+              additionalConfs = Map(
+                rocksdbChangelogCheckpointingConfKey -> "true",
+                SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
+                STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> 
fileManagerClassName)),
+            AddData(inputData, 3),
+            CheckLastBatch((3, 1)),
+            AddData(inputData, 3, 2),
+            // The second batch should fail to commit because the changelog 
file is not uploaded
+            ExpectFailure[SparkException] { ex =>
+              
ex.getCause.getMessage.contains("CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT")
+            }
+          )
+          assert(injectionState.delayedStreams.nonEmpty)
+          injectionState.delayedStreams.foreach(_.close())
+          injectionState.delayedStreams = Seq.empty
+          injectionState.createAtomicDelayCloseRegex = Seq.empty
+
+          inputData.addData(3, 1)
+
+          // The query will restart successfully and start at the checkpoint 
after Batch 1
+          testStream(aggregated, Update)(
+            StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+              additionalConfs = Map(
+                rocksdbChangelogCheckpointingConfKey -> "true",
+                SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
+                STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> 
fileManagerClassName)),
+            AddData(inputData, 4),
+            CheckLastBatch((3, 3), (1, 1), (4, 1)),
+            StopStream
+          )
+        }
+      }
+    }
+  }
+
+  /**
+   * An integrated test to cover this scenario:
+   * 1. A batch is running and a snapshot checkpoint is scheduled
+   * 2. The batch fails
+   * 3. The query restarts
+   * 4. The snapshot checkpoint succeeded
+   * and validate that the snapshot checkpoint is not used in subsequent query 
restart.
+   */
+  test("Previous Maintenance Snapshot Checkpoint Overwrite") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fileManagerClassName)
+    val rocksdbChangelogCheckpointingConfKey =
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled"
+    withTempDirAllowFailureInjection { (checkpointDir, injectionState) =>
+      withSQLConf(
+        rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") {
+        val inputData = MemoryStream[Int]
+        val aggregated =
+          inputData.toDF()
+            .groupBy($"value")
+            .agg(count("*"))
+            .as[(Int, Long)]
+
+        injectionState.createAtomicDelayCloseRegex = Seq(".*/*zip")
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+            additionalConfs = Map(
+              rocksdbChangelogCheckpointingConfKey -> "true",
+              SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
+              SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "20",
+              STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> 
fileManagerClassName)),
+          AddData(inputData, 3),
+          CheckAnswer((3, 1)),
+          AddData(inputData, 3, 2),
+          AddData(inputData, 3, 1),
+          CheckNewAnswer((3, 3), (2, 1), (1, 1)),
+          AddData(inputData, 1),
+          CheckNewAnswer((1, 2)),
+          Execute { _ =>
+            // Here we wait for the maintenance thread try to upload zip file 
and fails from
+            // at least one task.
+            while (injectionState.delayedStreams.isEmpty) {
+              Thread.sleep(1)
+            }
+            // If we call StoreStore.stop(), or let testStream() call it 
implicitly, it will
+            // cause deadlock. This is a work-around here before we make 
StateStore.stop() not
+            // deadlock.
+            StateStore.stopMaintenanceTaskWithoutLock()
+          },
+          StopStream
+        )
+        injectionState.createAtomicDelayCloseRegex = Seq.empty
+
+        // Query should still be restarted successfully without losing any 
data.
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+            additionalConfs = Map(
+              rocksdbChangelogCheckpointingConfKey -> "true",
+              SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
+              STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> 
fileManagerClassName)),
+          AddData(inputData, 1),
+          CheckAnswer((1, 3)),
+          StopStream
+        )
+        assert(injectionState.delayedStreams.nonEmpty)
+        // This will finish uploading the snapshot checkpoint
+        injectionState.delayedStreams.foreach(_.close())
+        injectionState.delayedStreams = Seq.empty
+
+        // After previous snapshot checkpoint succeeded, the query can still 
be restarted correctly.
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+            additionalConfs = Map(
+              rocksdbChangelogCheckpointingConfKey -> "true",
+              SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2",
+              STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> 
fileManagerClassName)),
+          AddData(inputData, 3, 1, 4),
+          CheckAnswer((3, 4), (1, 4), (4, 1)),
+          StopStream
+        )
+      }
+    }
+  }
+
+  def commitAndGetCheckpointId(db: RocksDB): Option[String] = {
+    val (v, ci) = db.commit()
+    ci.stateStoreCkptId
+  }
+
+  def withDB[T](
+      remoteDir: String,
+      version: Int,
+      conf: RocksDBConf,
+      hadoopConf: Configuration = new Configuration(),
+      enableStateStoreCheckpointIds: Boolean = false,
+      checkpointId: Option[String] = None)(
+      func: RocksDB => T): T = {
+    var db: RocksDB = null
+    try {
+      db = 
FailureInjectionRocksDBStateStoreProvider.createRocksDBWithFaultInjection(
+        remoteDir,
+        conf = conf,
+        localRootDir = Utils.createTempDir(),
+        hadoopConf = hadoopConf,
+        loggingId = s"[Thread-${Thread.currentThread.getId}]",
+        useColumnFamilies = true,
+        enableStateStoreCheckpointIds = enableStateStoreCheckpointIds,
+        partitionId = 0)
+      db.load(version, checkpointId)
+      func(db)
+    } finally {
+      if (db != null) {
+        db.close()
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to