This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c7024d35c45a [SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 c7024d35c45a is described below commit c7024d35c45a6e4f456f2a3ce5811a59586377cd Author: Siying Dong <dong...@gmail.com> AuthorDate: Wed Apr 9 13:50:24 2025 +0900 [SPARK-51714][SS] Add Failure Ingestion test to test state store checkpoint format V2 ### Why are the changes needed? The new state store checkpoint format needs failure tolerance tests to make sure the implementation is correct and delivers the behavior we would like. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? It is test code itself ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50508 from siying/ingest_failure9. Lead-authored-by: Siying Dong <dong...@gmail.com> Co-authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 18 +- .../streaming/state/RocksDBFileManager.scala | 8 +- .../state/RocksDBStateStoreProvider.scala | 22 +- .../sql/execution/streaming/state/StateStore.scala | 8 + .../FailureInjectionCheckpointFileManager.scala | 312 ++++++++++++ .../RocksDBCheckpointFailureInjectionSuite.scala | 535 +++++++++++++++++++++ 6 files changed, 899 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index befb1dbdc23b..15df2fae8260 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -135,7 +135,23 @@ class RocksDB( private val nativeStats = rocksDbOptions.statistics() private val workingDir = createTempDir("workingDir") - private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"), + + protected def createFileManager( + dfsRootDir: String, + localTempDir: File, + hadoopConf: Configuration, + codecName: String, + loggingId: String): RocksDBFileManager = { + new RocksDBFileManager( + dfsRootDir, + localTempDir, + hadoopConf, + codecName, + loggingId = loggingId + ) + } + + private val fileManager = createFileManager(dfsRootDir, createTempDir("fileManager"), hadoopConf, conf.compressionCodec, loggingId = loggingId) private val byteArrayPair = new ByteArrayPair() private val commitLatencyMs = new mutable.HashMap[String, Long]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index a5cd2218986c..562a57aafbd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -32,7 +32,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization @@ -133,8 +133,12 @@ class RocksDBFileManager( import RocksDBImmutableFile._ + protected def getFileSystem(myDfsRootDir: String, myHadoopConf: Configuration) : FileSystem = { + new Path(myDfsRootDir).getFileSystem(myHadoopConf) + } + private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) - private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) + private val fs = getFileSystem(dfsRootDir, hadoopConf) private val onlyZipFiles = new PathFilter { override def accept(path: Path): Boolean = path.toString.endsWith(".zip") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 15794cada675..601caaa34290 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -524,13 +524,33 @@ private[sql] class RocksDBStateStoreProvider @volatile private var stateStoreEncoding: String = _ @volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _ + protected def createRocksDB( + dfsRootDir: String, + conf: RocksDBConf, + localRootDir: File, + hadoopConf: Configuration, + loggingId: String, + useColumnFamilies: Boolean, + enableStateStoreCheckpointIds: Boolean, + partitionId: Int = 0): RocksDB = { + new RocksDB( + dfsRootDir, + conf, + localRootDir, + hadoopConf, + loggingId, + useColumnFamilies, + enableStateStoreCheckpointIds, + partitionId) + } + private[sql] lazy val rocksDB = { val dfsRootDir = stateStoreId.storeCheckpointLocation().toString val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," + s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})" val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr) - new RocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, storeIdStr, + createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, storeIdStr, useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 33a21c79f3db..ccb925287e77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -988,6 +988,14 @@ object StateStore extends Logging { /** Stop maintenance thread and reset the maintenance task */ def stopMaintenanceTask(): Unit = loadedProviders.synchronized { + stopMaintenanceTaskWithoutLock() + } + + /** + * Only used for unit tests. The function doesn't hold loadedProviders lock. Calling + * it can work-around a deadlock condition where a maintenance task is waiting for the lock + * */ + private[streaming] def stopMaintenanceTaskWithoutLock(): Unit = { if (maintenanceThreadPool != null) { maintenanceThreadPoolLock.synchronized { maintenancePartitions.clear() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala new file mode 100644 index 000000000000..4711a45804fb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/FailureInjectionCheckpointFileManager.scala @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming.state + +import java.io._ +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.util.Progressable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} +import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager + +/** + * A wrapper file output stream that will throw exception in close() and put the underlying + * stream to FailureInjectionFileSystem.delayedStreams + * @param stream stream to be wrapped + */ +class DelayCloseFSDataOutputStreamWrapper( + stream: CancellableFSDataOutputStream, + injectionState: FailureInjectionState) + extends CancellableFSDataOutputStream(stream.getWrappedStream) with Logging { + val originalStream: CancellableFSDataOutputStream = stream + + var closed: Boolean = false + + override def close(): Unit = { + if (!closed) { + closed = true + injectionState.delayedStreams = injectionState.delayedStreams :+ originalStream + throw new IOException("Fake File Stream Close Failure") + } + } + + /** Cancel is not needed in unit tests */ + override def cancel(): Unit = {} +} + +/** + * A wrapper checkpoint file manager that might inject failures in some function calls. + * Used in unit tests to simulate failure scenarios. + * This can be put into SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS to provide failure + * injection behavior. + * + * @param path The path to the checkpoint directory, passing to the parent class + * @param hadoopConf hadoop conf that will be passed to the parent class + */ +class FailureInjectionCheckpointFileManager(path: Path, hadoopConf: Configuration) + extends FileSystemBasedCheckpointFileManager(path, hadoopConf) with Logging { + + // Injection state for the path + private val injectionState = FailureInjectionFileSystem.getInjectionState(path.toString) + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + injectionState.failureCreateAtomicRegex.foreach { pattern => + if (path.toString.matches(pattern)) { + throw new IOException("Fake File System Create Atomic Failure") + } + } + + var shouldDelay = false + injectionState.createAtomicDelayCloseRegex.foreach { pattern => + if (path.toString.matches(pattern)) { + shouldDelay = true + } + } + val ret = new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible) + if (shouldDelay) { + new DelayCloseFSDataOutputStreamWrapper(ret, injectionState) + } else { + ret + } + } + + override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { + if (injectionState.allowOverwriteInRename || !fs.exists(dstPath)) { + super.renameTempFile(srcPath, dstPath, overwriteIfPossible) + } else { + logWarning(s"Skip renaming temp file $srcPath to $dstPath because it already exists.") + } + } + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = { + super.list(path, filter) + } + + override def exists(path: Path): Boolean = { + if (injectionState.shouldFailExist) { + throw new IOException("Fake File Exists Failure") + } + super.exists(path) + } +} + +/** + * A class that contains the failure injection state for a path. + * Variables in this class cannot be updated concurrently by two threads, but can have multiple + * readers + */ +class FailureInjectionState { + // File names matching this regex will cause the copyFromLocalFile to fail + @volatile + var failPreCopyFromLocalFileNameRegex: Seq[String] = Seq.empty + // File names matching this regex will cause the close() to fail and put the streams in + // `delayedStreams` + @volatile + var createAtomicDelayCloseRegex: Seq[String] = Seq.empty + // File names matching this regex will cause the createAtomic() to fail + @volatile + var failureCreateAtomicRegex: Seq[String] = Seq.empty + // If true, Exists() call will fail + @volatile + var shouldFailExist: Boolean = false + // If true, simulate a case where rename() will not overwrite an existing file. + @volatile + var allowOverwriteInRename: Boolean = true + + // List of streams that are delayed in close() based on `createAtomicDelayCloseRegex` + @volatile + var delayedStreams: Seq[CancellableFSDataOutputStream] = Seq.empty +} + +/** + * Contains a list of variables for failure ingestion conditions. + * These are singleton instances accessed by all instances of FailureInjectionCheckpointFileManager + * and FailureInjectionFileSystem. This allows a unit test to have a global control of failure + * and access to the delayed streams. + */ +object FailureInjectionFileSystem { + // A map from a temp path to its failure injection state. + var tempPathToInjectionState: Map[String, FailureInjectionState] = Map.empty + + /** + * Create a new FailureInjectionState for a temp path and add it to the map. + * @param path the temp path + * @return the newly created failure injection state + */ + def addPathToTempToInjectionState(path: String): FailureInjectionState = synchronized { + // Throw exception if the path already exists in the map + assert(!tempPathToInjectionState.contains(path), s"Path $path already exists in the map") + tempPathToInjectionState = tempPathToInjectionState + (path -> new FailureInjectionState) + tempPathToInjectionState(path) + } + + /** + * Clean up a temp path and its failure injection state + * @param path the temp path to be cle + */ + def removePathFromTempToInjectionState(path: String): Unit = synchronized { + tempPathToInjectionState = tempPathToInjectionState - path + } + + /** + * find injection state based on temp dir as prefix + * @param path a path with temp dir as prefix + * @return the injection state if the path is in the map. Exception if there is no match for + * prefix + */ + def getInjectionState(path: String): FailureInjectionState = synchronized { + // remove "file://" prefix from path + val cleanedPath = path.replace("file:/", "/") + // return the injection state if the path in the map is prefix of path + val pathPrefix = tempPathToInjectionState.keys.find(cleanedPath.startsWith) + assert(pathPrefix.isDefined) + tempPathToInjectionState.get(pathPrefix.get).get + } +} + +/** + * A wrapper FileSystem that inject some failures. This class can used to replace the + * FileSystem in RocksDBFileManager. + * @param innerFs the FileSystem to be wrapped + */ +class FailureInjectionFileSystem(innerFs: FileSystem) extends FileSystem { + + override def getConf: Configuration = innerFs.getConf + + override def mkdirs(f: Path, permission: FsPermission): Boolean = innerFs.mkdirs(f, permission) + + override def rename(src: Path, dst: Path): Boolean = innerFs.rename(src, dst) + + override def getUri: URI = innerFs.getUri + + override def open(f: Path, bufferSize: Int): FSDataInputStream = innerFs.open(f, bufferSize) + + override def create( + f: Path, + permission: FsPermission, + overwrite: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long, + progress: Progressable): FSDataOutputStream = + innerFs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress) + + override def append(f: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream = + innerFs.append(f, bufferSize, progress) + + override def delete(f: Path, recursive: Boolean): Boolean = innerFs.delete(f, recursive) + + override def listStatus(f: Path): Array[FileStatus] = innerFs.listStatus(f) + + override def setWorkingDirectory(new_dir: Path): Unit = innerFs.setWorkingDirectory(new_dir) + + override def getWorkingDirectory: Path = innerFs.getWorkingDirectory + + override def getFileStatus(f: Path): FileStatus = innerFs.getFileStatus(f) + + override def copyFromLocalFile(src: Path, dst: Path): Unit = { + // Find injection state based on the destination path + val injectionState = FailureInjectionFileSystem.getInjectionState(dst.toString) + + injectionState.failPreCopyFromLocalFileNameRegex.foreach { pattern => + if (src.toString.matches(pattern)) { + throw new IOException(s"Injected failure due to source path matching pattern: $pattern") + } + } + + innerFs.copyFromLocalFile(src, dst) + } +} + +/** + * A wrapper RocksDB State Store Provider that replaces FileSystem used in RocksDBFileManager + * to FailureInjectionFileSystem. + */ +class FailureInjectionRocksDBStateStoreProvider extends RocksDBStateStoreProvider { + override def createRocksDB( + dfsRootDir: String, + conf: RocksDBConf, + localRootDir: File, + hadoopConf: Configuration, + loggingId: String, + useColumnFamilies: Boolean, + enableStateStoreCheckpointIds: Boolean, + partitionId: Int): RocksDB = { + FailureInjectionRocksDBStateStoreProvider.createRocksDBWithFaultInjection( + dfsRootDir, + conf, + localRootDir, + hadoopConf, + loggingId, + useColumnFamilies, + enableStateStoreCheckpointIds, + partitionId) + } +} + +object FailureInjectionRocksDBStateStoreProvider { + /** + * RocksDBFieManager is created by RocksDB class where it creates a default FileSystem. + * We make RocksDB create a RocksDBFileManager that uses a different FileSystem here. + * */ + def createRocksDBWithFaultInjection( + dfsRootDir: String, + conf: RocksDBConf, + localRootDir: File, + hadoopConf: Configuration, + loggingId: String, + useColumnFamilies: Boolean, + enableStateStoreCheckpointIds: Boolean, + partitionId: Int): RocksDB = { + new RocksDB( + dfsRootDir, + conf = conf, + localRootDir = localRootDir, + hadoopConf = hadoopConf, + loggingId = loggingId, + useColumnFamilies = useColumnFamilies, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + partitionId = partitionId + ) { + override def createFileManager( + dfsRootDir: String, + localTempDir: File, + hadoopConf: Configuration, + codecName: String, + loggingId: String): RocksDBFileManager = { + new RocksDBFileManager( + dfsRootDir, + localTempDir, + hadoopConf, + codecName, + loggingId = loggingId) { + override def getFileSystem( + myDfsRootDir: String, + myHadoopConf: Configuration): FileSystem = { + new FailureInjectionFileSystem(new Path(myDfsRootDir).getFileSystem(myHadoopConf)) + } + } + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala new file mode 100644 index 000000000000..31fc51c4d56f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io._ + +import scala.language.implicitConversions + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.count +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.streaming.OutputMode.Update +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.SlowSQLTest +import org.apache.spark.util.Utils + + +@SlowSQLTest +/** Test suite to inject some failures in RocksDB checkpoint */ +class RocksDBCheckpointFailureInjectionSuite extends StreamTest + with SharedSparkSession { + + private val fileManagerClassName = classOf[FailureInjectionCheckpointFileManager].getName + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(SQLConf.STATE_STORE_PROVIDER_CLASS, classOf[RocksDBStateStoreProvider].getName) + + } + + override def beforeEach(): Unit = { + super.beforeEach() + } + + /** + * create a temp dir and register it to failure injection file system and remove it in the end + * The function can be moved to StreamTest if it is used by other tests too. + * @param f the function to run with the temp dir and the injection state + */ + def withTempDirAllowFailureInjection(f: (File, FailureInjectionState) => Unit): Unit = { + withTempDir { dir => + val injectionState = FailureInjectionFileSystem.addPathToTempToInjectionState(dir.getPath) + try { + f(dir, injectionState) + } finally { + FailureInjectionFileSystem.removePathFromTempToInjectionState(dir.getPath) + } + } + } + + implicit def toArray(str: String): Array[Byte] = if (str != null) str.getBytes else null + + case class FailureConf(ifEnableStateStoreCheckpointIds: Boolean, fileType: String) { + override def toString: String = { + s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds, " + + s"fileType = $fileType" + } + } + + Seq( + FailureConf(ifEnableStateStoreCheckpointIds = false, "zip"), + FailureConf(ifEnableStateStoreCheckpointIds = false, "sst"), + FailureConf(ifEnableStateStoreCheckpointIds = true, "zip"), + FailureConf(ifEnableStateStoreCheckpointIds = true, "sst")).foreach { testConf => + test(s"Basic RocksDB SST File Upload Failure Handling $testConf") { + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName) + withTempDirAllowFailureInjection { (remoteDir, injectionState) => + withSQLConf( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "false") { + val conf = RocksDBConf(StateStoreConf(SQLConf.get)) + withDB( + remoteDir.getAbsolutePath, + version = 0, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = testConf.ifEnableStateStoreCheckpointIds) { db => + db.put("version", "1.1") + val checkpointId1 = commitAndGetCheckpointId(db) + + if (testConf.fileType == "sst") { + injectionState.failPreCopyFromLocalFileNameRegex = Seq(".*sst") + } else { + assert(testConf.fileType == "zip") + injectionState.failureCreateAtomicRegex = Seq(".*zip") + } + db.put("version", "2.1") + var checkpointId2: Option[String] = None + intercept[IOException] { + checkpointId2 = commitAndGetCheckpointId(db) + } + + db.load(1, checkpointId1) + + injectionState.failPreCopyFromLocalFileNameRegex = Seq.empty + injectionState.failureCreateAtomicRegex = Seq.empty + // When ifEnableStateStoreCheckpointIds is true, checkpointId is not available + // to load version 2. If we use None, it will throw a Runtime error. We probably + // should categorize this error. + // TODO test ifEnableStateStoreCheckpointIds = true case after it is fixed. + if (!testConf.ifEnableStateStoreCheckpointIds) { + // Make sure that the checkpoint can't be loaded as some files aren't uploaded + // correctly. + val ex = intercept[SparkException] { + db.load(2, checkpointId2) + } + checkError( + ex, + condition = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE", + parameters = Map( + "fileToRead" -> s"$remoteDir/2.changelog" + ) + ) + } + + db.load(0) + injectionState.shouldFailExist = true + intercept[IOException] { + db.load(1, checkpointId1) + } + } + } + } + } + } + + /** + * This test is to simulate the case where a previous task had connectivity problem that couldn't + * be killed or write zip file. Only after the later one is successfully committed, it comes back + * and write the zip file. + */ + Seq(true, false).foreach { ifEnableStateStoreCheckpointIds => + test( + "Zip File Overwritten by Previous Task Checkpoint " + + s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds") { + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName) + withTempDirAllowFailureInjection { (remoteDir, injectionState) => + withSQLConf( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "false") { + val conf = RocksDBConf(StateStoreConf(SQLConf.get)) + + var checkpointId2: Option[String] = None + withDB( + remoteDir.getAbsolutePath, + version = 0, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds) { db => + db.put("version", "1.1") + val checkpointId1 = commitAndGetCheckpointId(db) + + db.load(1, checkpointId1) + injectionState.createAtomicDelayCloseRegex = Seq(".*zip") + db.put("version", "2.1") + + intercept[IOException] { + commitAndGetCheckpointId(db) + } + + injectionState.createAtomicDelayCloseRegex = Seq.empty + + db.load(1, checkpointId1) + + db.put("version", "2.2") + checkpointId2 = commitAndGetCheckpointId(db) + + assert(injectionState.delayedStreams.nonEmpty) + injectionState.delayedStreams.foreach(_.close()) + } + withDB( + remoteDir.getAbsolutePath, + version = 2, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds, + checkpointId = checkpointId2) { db => + if (ifEnableStateStoreCheckpointIds) { + // If checkpointV2 is used, writing a checkpoint file from a previously failed + // batch should be ignored. + assert(new String(db.get("version"), "UTF-8") == "2.2") + } else { + // Assuming previous 2.zip overwrites, we should see the previous value. + // This validation isn't necessary here but we just would like to make sure + // FailureInjectionCheckpointFileManager has correct behavior -- allows zip files + // to be delayed to be written, so that the test for + // ifEnableStateStoreCheckpointIds = true is valid. + assert(new String(db.get("version"), "UTF-8") == "2.1") + } + } + } + } + } + } + + /** + * This test is to simulate the case where a previous task had connectivity problem that couldn't + * be killed or write changelog file. Only after the later one is successfully committed, it comes + * back and write the changelog file. + * */ + Seq(false, true).foreach { ifEnableStateStoreCheckpointIds => + test( + "Changelog File Overwritten by Previous Task With Changelog Checkpoint " + + s"ifEnableStateStoreCheckpointIds = $ifEnableStateStoreCheckpointIds") { + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName) + withTempDirAllowFailureInjection { (remoteDir, injectionState) => + withSQLConf( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "5") { + val conf = RocksDBConf(StateStoreConf(SQLConf.get)) + + var checkpointId2: Option[String] = None + withDB( + remoteDir.getAbsolutePath, + version = 0, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds) { db => + db.put("version", "1.1") + val checkpointId1 = commitAndGetCheckpointId(db) + + injectionState.createAtomicDelayCloseRegex = Seq(".*/2.*changelog") + + db.load(1, checkpointId1) + db.put("version", "2.1") + intercept[IOException] { + commitAndGetCheckpointId(db) + } + + injectionState.createAtomicDelayCloseRegex = Seq.empty + + db.load(1, checkpointId1) + + db.put("version", "2.2") + checkpointId2 = commitAndGetCheckpointId(db) + + assert(injectionState.delayedStreams.nonEmpty) + injectionState.delayedStreams.foreach(_.close()) + + db.load(1, checkpointId1) + db.load(2, checkpointId2) + } + + withDB( + remoteDir.getAbsolutePath, + version = 2, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = ifEnableStateStoreCheckpointIds, + checkpointId = checkpointId2) { db => + if (ifEnableStateStoreCheckpointIds) { + // If checkpointV2 is used, writing a checkpoint file from a previously failed + // batch should be ignored. + assert(new String(db.get("version"), "UTF-8") == "2.2") + } else { + // This check is not necessary. But we would like to validate the behavior of + // FailureInjectionFileSystem to ensure the ifEnableStateStoreCheckpointIds = true + // case is valid. + assert(new String(db.get("version"), "UTF-8") == "2.1") + } + } + } + } + } + } + + /** + * This test is to simulate the case where + * 1. There is a snapshot checkpoint scheduled + * 2. The batch eventually failed + * 3. Query is retried and moved forward + * 4. The snapshot checkpoint succeeded + * In checkpoint V2, this snapshot shouldn't take effect. Otherwise, it will break the strong + * consistency guaranteed by V2. + */ + test("Delay Snapshot V2") { + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName) + withTempDirAllowFailureInjection { (remoteDir, _) => + withSQLConf( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") { + val conf = RocksDBConf(StateStoreConf(SQLConf.get)) + var checkpointId3: Option[String] = None + withDB( + remoteDir.getAbsolutePath, + version = 0, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = true) { db => + db.put("version", "1.1") + val checkpointId1 = commitAndGetCheckpointId(db) + + // Creating another DB which will foce a snapshot checkpoint to an older version. + withDB( + remoteDir.getAbsolutePath, + version = 1, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = true, + checkpointId = checkpointId1) { db2 => + db2.put("version", "2.1") + db2.commit() + + db.load(1, checkpointId1) + db.put("version", "2.2") + val checkpointId2 = commitAndGetCheckpointId(db) + + db.load(2, checkpointId2) + db.put("foo", "bar") + checkpointId3 = commitAndGetCheckpointId(db) + + db2.doMaintenance() + } + } + withDB( + remoteDir.getAbsolutePath, + version = 3, + conf = conf, + hadoopConf = hadoopConf, + enableStateStoreCheckpointIds = true, + checkpointId = checkpointId3) { db => + // Checkpointing V2 should ignore the snapshot checkpoint from the previous batch. + assert(new String(db.get("version"), "UTF-8") == "2.2") + assert(new String(db.get("foo"), "UTF-8") == "bar") + } + } + } + } + + import testImplicits._ + + /** + * An integrated test where a previous changelog from a failed batch come back and finish + * writing. In checkpoint V2, this changelog should be ignored. + * Test it with both file renaming overwrite and not renaming overwrite. + */ + Seq(false, true).foreach { ifAllowRenameOverwrite => + test(s"Job failure with changelog shows up ifAllowRenameOverwrite = $ifAllowRenameOverwrite") { + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName) + val rocksdbChangelogCheckpointingConfKey = + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" + + withTempDirAllowFailureInjection { (checkpointDir, injectionState) => + injectionState.allowOverwriteInRename = ifAllowRenameOverwrite + withSQLConf( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") { + val inputData = MemoryStream[Int] + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + injectionState.createAtomicDelayCloseRegex = Seq(".*/2_.*changelog") + + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2", + STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)), + AddData(inputData, 3), + CheckLastBatch((3, 1)), + AddData(inputData, 3, 2), + // The second batch should fail to commit because the changelog file is not uploaded + ExpectFailure[SparkException] { ex => + ex.getCause.getMessage.contains("CANNOT_WRITE_STATE_STORE.CANNOT_COMMIT") + } + ) + assert(injectionState.delayedStreams.nonEmpty) + injectionState.delayedStreams.foreach(_.close()) + injectionState.delayedStreams = Seq.empty + injectionState.createAtomicDelayCloseRegex = Seq.empty + + inputData.addData(3, 1) + + // The query will restart successfully and start at the checkpoint after Batch 1 + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2", + STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)), + AddData(inputData, 4), + CheckLastBatch((3, 3), (1, 1), (4, 1)), + StopStream + ) + } + } + } + } + + /** + * An integrated test to cover this scenario: + * 1. A batch is running and a snapshot checkpoint is scheduled + * 2. The batch fails + * 3. The query restarts + * 4. The snapshot checkpoint succeeded + * and validate that the snapshot checkpoint is not used in subsequent query restart. + */ + test("Previous Maintenance Snapshot Checkpoint Overwrite") { + val hadoopConf = new Configuration() + hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, fileManagerClassName) + val rocksdbChangelogCheckpointingConfKey = + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" + withTempDirAllowFailureInjection { (checkpointDir, injectionState) => + withSQLConf( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2") { + val inputData = MemoryStream[Int] + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + injectionState.createAtomicDelayCloseRegex = Seq(".*/*zip") + + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2", + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "20", + STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)), + AddData(inputData, 3), + CheckAnswer((3, 1)), + AddData(inputData, 3, 2), + AddData(inputData, 3, 1), + CheckNewAnswer((3, 3), (2, 1), (1, 1)), + AddData(inputData, 1), + CheckNewAnswer((1, 2)), + Execute { _ => + // Here we wait for the maintenance thread try to upload zip file and fails from + // at least one task. + while (injectionState.delayedStreams.isEmpty) { + Thread.sleep(1) + } + // If we call StoreStore.stop(), or let testStream() call it implicitly, it will + // cause deadlock. This is a work-around here before we make StateStore.stop() not + // deadlock. + StateStore.stopMaintenanceTaskWithoutLock() + }, + StopStream + ) + injectionState.createAtomicDelayCloseRegex = Seq.empty + + // Query should still be restarted successfully without losing any data. + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2", + STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)), + AddData(inputData, 1), + CheckAnswer((1, 3)), + StopStream + ) + assert(injectionState.delayedStreams.nonEmpty) + // This will finish uploading the snapshot checkpoint + injectionState.delayedStreams.foreach(_.close()) + injectionState.delayedStreams = Seq.empty + + // After previous snapshot checkpoint succeeded, the query can still be restarted correctly. + testStream(aggregated, Update)( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath, + additionalConfs = Map( + rocksdbChangelogCheckpointingConfKey -> "true", + SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2", + STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> fileManagerClassName)), + AddData(inputData, 3, 1, 4), + CheckAnswer((3, 4), (1, 4), (4, 1)), + StopStream + ) + } + } + } + + def commitAndGetCheckpointId(db: RocksDB): Option[String] = { + val (v, ci) = db.commit() + ci.stateStoreCkptId + } + + def withDB[T]( + remoteDir: String, + version: Int, + conf: RocksDBConf, + hadoopConf: Configuration = new Configuration(), + enableStateStoreCheckpointIds: Boolean = false, + checkpointId: Option[String] = None)( + func: RocksDB => T): T = { + var db: RocksDB = null + try { + db = FailureInjectionRocksDBStateStoreProvider.createRocksDBWithFaultInjection( + remoteDir, + conf = conf, + localRootDir = Utils.createTempDir(), + hadoopConf = hadoopConf, + loggingId = s"[Thread-${Thread.currentThread.getId}]", + useColumnFamilies = true, + enableStateStoreCheckpointIds = enableStateStoreCheckpointIds, + partitionId = 0) + db.load(version, checkpointId) + func(db) + } finally { + if (db != null) { + db.close() + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org