Repository: spark
Updated Branches:
  refs/heads/master e4ce1bc4f -> 6563d72b1


[SPARK-15664][MLLIB] Replace FileSystem.get(conf) with path.getFileSystem(conf) 
when removing CheckpointFile in MLlib

## What changes were proposed in this pull request?
if sparkContext.set CheckpointDir to another Dir that is not default 
FileSystem, it will throw exception when removing CheckpointFile in MLlib.
So we should always get the FileSystem from Path to avoid wrong FS problem.
## How was this patch tested?
N/A

Author: Lianhui Wang <[email protected]>

Closes #13408 from lianhuiwang/SPARK-15664.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6563d72b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6563d72b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6563d72b

Branch: refs/heads/master
Commit: 6563d72b168c39115376e73788b48a2d60803d4e
Parents: e4ce1bc
Author: Lianhui Wang <[email protected]>
Authored: Wed Jun 1 08:30:38 2016 -0500
Committer: Sean Owen <[email protected]>
Committed: Wed Jun 1 08:30:38 2016 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/clustering/LDA.scala   |  6 +++---
 .../org/apache/spark/ml/tree/impl/NodeIdCache.scala  | 14 +++++++++-----
 .../spark/mllib/impl/PeriodicCheckpointer.scala      | 15 +++++++++------
 .../mllib/impl/PeriodicGraphCheckpointerSuite.scala  |  8 +++++---
 .../mllib/impl/PeriodicRDDCheckpointerSuite.scala    |  8 +++++---
 5 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6563d72b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index ec60991..5aec692 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.ml.clustering
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.internal.Logging
@@ -696,8 +696,8 @@ class DistributedLDAModel private[ml] (
   @DeveloperApi
   @Since("2.0.0")
   def deleteCheckpointFiles(): Unit = {
-    val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
-    _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
+    val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
+    _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, 
hadoopConf))
     _checkpointFiles = Array.empty[String]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6563d72b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
index 800430f..a7c5f48 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
@@ -21,7 +21,7 @@ import java.io.IOException
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.tree.{LearningNode, Split}
@@ -77,8 +77,8 @@ private[spark] class NodeIdCache(
   // Indicates whether we can checkpoint
   private val canCheckpoint = 
nodeIdsForInstances.sparkContext.getCheckpointDir.nonEmpty
 
-  // FileSystem instance for deleting checkpoints as needed
-  private val fs = 
FileSystem.get(nodeIdsForInstances.sparkContext.hadoopConfiguration)
+  // Hadoop Configuration for deleting checkpoints as needed
+  private val hadoopConf = nodeIdsForInstances.sparkContext.hadoopConfiguration
 
   /**
    * Update the node index values in the cache.
@@ -130,7 +130,9 @@ private[spark] class NodeIdCache(
           val old = checkpointQueue.dequeue()
           // Since the old checkpoint is not deleted by Spark, we'll manually 
delete it here.
           try {
-            fs.delete(new Path(old.getCheckpointFile.get), true)
+            val path = new Path(old.getCheckpointFile.get)
+            val fs = path.getFileSystem(hadoopConf)
+            fs.delete(path, true)
           } catch {
             case e: IOException =>
               logError("Decision Tree learning using cacheNodeIds failed to 
remove checkpoint" +
@@ -154,7 +156,9 @@ private[spark] class NodeIdCache(
       val old = checkpointQueue.dequeue()
       if (old.getCheckpointFile.isDefined) {
         try {
-          fs.delete(new Path(old.getCheckpointFile.get), true)
+          val path = new Path(old.getCheckpointFile.get)
+          val fs = path.getFileSystem(hadoopConf)
+          fs.delete(path, true)
         } catch {
           case e: IOException =>
             logError("Decision Tree learning using cacheNodeIds failed to 
remove checkpoint" +

http://git-wip-us.apache.org/repos/asf/spark/blob/6563d72b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
index 5c12c93..4dd498c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
@@ -19,7 +19,8 @@ package org.apache.spark.mllib.impl
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
@@ -160,21 +161,23 @@ private[mllib] abstract class PeriodicCheckpointer[T](
   private def removeCheckpointFile(): Unit = {
     val old = checkpointQueue.dequeue()
     // Since the old checkpoint is not deleted by Spark, we manually delete it.
-    val fs = FileSystem.get(sc.hadoopConfiguration)
-    
getCheckpointFiles(old).foreach(PeriodicCheckpointer.removeCheckpointFile(_, 
fs))
+    getCheckpointFiles(old).foreach(
+      PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration))
   }
 }
 
 private[spark] object PeriodicCheckpointer extends Logging {
 
   /** Delete a checkpoint file, and log a warning if deletion fails. */
-  def removeCheckpointFile(path: String, fs: FileSystem): Unit = {
+  def removeCheckpointFile(checkpointFile: String, conf: Configuration): Unit 
= {
     try {
-      fs.delete(new Path(path), true)
+      val path = new Path(checkpointFile)
+      val fs = path.getFileSystem(conf)
+      fs.delete(path, true)
     } catch {
       case e: Exception =>
         logWarning("PeriodicCheckpointer could not remove old checkpoint file: 
" +
-          path)
+          checkpointFile)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6563d72b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
index e331c75..a13e7f6 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.impl
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.apache.spark.graphx.{Edge, Graph}
@@ -140,9 +140,11 @@ private object PeriodicGraphCheckpointerSuite {
     //       Instead, we check for the presence of the checkpoint files.
     //       This test should continue to work even after this 
graph.isCheckpointed issue
     //       is fixed (though it can then be simplified and not look for the 
files).
-    val fs = FileSystem.get(graph.vertices.sparkContext.hadoopConfiguration)
+    val hadoopConf = graph.vertices.sparkContext.hadoopConfiguration
     graph.getCheckpointFiles.foreach { checkpointFile =>
-      assert(!fs.exists(new Path(checkpointFile)),
+      val path = new Path(checkpointFile)
+      val fs = path.getFileSystem(hadoopConf)
+      assert(!fs.exists(path),
         "Graph checkpoint file should have been removed")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6563d72b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
index b2a459a..14adf8c 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.impl
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -127,9 +127,11 @@ private object PeriodicRDDCheckpointerSuite {
     //       Instead, we check for the presence of the checkpoint files.
     //       This test should continue to work even after this 
rdd.isCheckpointed issue
     //       is fixed (though it can then be simplified and not look for the 
files).
-    val fs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
+    val hadoopConf = rdd.sparkContext.hadoopConfiguration
     rdd.getCheckpointFile.foreach { checkpointFile =>
-      assert(!fs.exists(new Path(checkpointFile)), "RDD checkpoint file should 
have been removed")
+      val path = new Path(checkpointFile)
+      val fs = path.getFileSystem(hadoopConf)
+      assert(!fs.exists(path), "RDD checkpoint file should have been removed")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to