Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8aa419b23 -> 0cceb1bfe


[SPARK-18342] Make rename failures fatal in HDFSBackedStateStore

## What changes were proposed in this pull request?

If the rename operation in the state store fails (`fs.rename` returns `false`), 
the StateStore should throw an exception and have the task retry. Currently if 
renames fail, nothing happens during execution immediately. However, you will 
observe that snapshot operations will fail, and then any attempt at recovery 
(executor failure / checkpoint recovery) also fails.

## How was this patch tested?

Unit test

Author: Burak Yavuz <[email protected]>

Closes #15804 from brkyvz/rename-state.

(cherry picked from commit 6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cceb1bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cceb1bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cceb1bf

Branch: refs/heads/branch-2.0
Commit: 0cceb1bfeea590d38bff1aae29d6802238ec34f2
Parents: 8aa419b
Author: Burak Yavuz <[email protected]>
Authored: Tue Nov 8 15:08:09 2016 -0800
Committer: Tathagata Das <[email protected]>
Committed: Tue Nov 8 15:08:43 2016 -0800

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    |  6 ++-
 .../streaming/state/StateStoreSuite.scala       | 41 +++++++++++++++++---
 2 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0cceb1bf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index bbbb6d3..824fe29 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider(
   private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: 
Path): Path = {
     synchronized {
       val finalDeltaFile = deltaFile(newVersion)
-      fs.rename(tempDeltaFile, finalDeltaFile)
+      if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
+        throw new IOException(s"Failed to rename $tempDeltaFile to 
$finalDeltaFile")
+      }
       loadedMaps.put(newVersion, map)
       finalDeltaFile
     }
@@ -523,7 +525,7 @@ private[state] class HDFSBackedStateStoreProvider(
 
         val deltaFiles = allFiles.filter { file =>
           file.version > snapshotFile.version && file.version <= version
-        }
+        }.toList
         verify(
           deltaFiles.size == version - snapshotFile.version,
           s"Unexpected list of delta files for version $version for $this: 
$deltaFiles"

http://git-wip-us.apache.org/repos/asf/spark/blob/0cceb1bf/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index fcf300b..504a265 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File, IOException}
+import java.net.URI
 
 import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
@@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
     }
   }
 
+  test("SPARK-18342: commit fails when rename fails") {
+    import RenameReturnsFalseFileSystem._
+    val dir = scheme + "://" + Utils.createDirectory(tempDir, 
Random.nextString(5)).toString
+    val conf = new Configuration()
+    conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
+    val provider = newStoreProvider(dir = dir, hadoopConf = conf)
+    val store = provider.getStore(0)
+    put(store, "a", 0)
+    val e = intercept[IllegalStateException](store.commit())
+    assert(e.getCause.getMessage.contains("Failed to rename"))
+  }
+
   def getDataFromFiles(
       provider: HDFSBackedStateStoreProvider,
     version: Int = -1): Set[(String, Int)] = {
@@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
   def newStoreProvider(
       opId: Long = Random.nextLong,
       partition: Int = 0,
-      minDeltasForSnapshot: Int = 
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get
+      minDeltasForSnapshot: Int = 
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+      dir: String = Utils.createDirectory(tempDir, 
Random.nextString(5)).toString,
+      hadoopConf: Configuration = new Configuration()
     ): HDFSBackedStateStoreProvider = {
-    val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
     val sqlConf = new SQLConf()
     sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, 
minDeltasForSnapshot)
     new HDFSBackedStateStoreProvider(
@@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
       keySchema,
       valueSchema,
       new StateStoreConf(sqlConf),
-      new Configuration())
+      hadoopConf)
   }
 
   def remove(store: StateStore, condition: String => Boolean): Unit = {
@@ -598,3 +612,20 @@ private[state] object StateStoreSuite {
     }}.toSet
   }
 }
+
+/**
+ * Fake FileSystem to test that the StateStore throws an exception while 
committing the
+ * delta file, when `fs.rename` returns `false`.
+ */
+class RenameReturnsFalseFileSystem extends RawLocalFileSystem {
+  import RenameReturnsFalseFileSystem._
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def rename(src: Path, dst: Path): Boolean = false
+}
+
+object RenameReturnsFalseFileSystem {
+  val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to