This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 3c07a3e7e5d [SPARK-44272][YARN] Path Inconsistency when Operating 
statCache within Yarn Client
3c07a3e7e5d is described below

commit 3c07a3e7e5dba165f2c579bb4969405bc9d4f20b
Author: Shu Wang <[email protected]>
AuthorDate: Wed Jul 19 02:18:59 2023 -0500

    [SPARK-44272][YARN] Path Inconsistency when Operating statCache within Yarn 
Client
    
    ### What changes were proposed in this pull request?
    
    1. Change `statCache.getOrElse` to `statCache.getOrElseUpdate` so that the 
corresponding FileStatus can be cached into `statCache`
    2. Change the `Path` parameter `isPublic`,  `checkPermissionOfOther`, and 
`ancestorsHaveExecutePermissions` to `URI`.
    3. Add `getParentURI` method when we construct the parent URI.
    
    ### Why are the changes needed?
    
    We should not use `uri.getPath()` when constructing the Path which will not 
retain information like scheme. This means that `statCache` is not really 
taking any effect.
    For example, if uri is "file:/foo.invalid.com:8080/tmp/testing", then
    
    ```
    uri.getPath -> /foo.invalid.com:8080/tmp/testing
    uri.toString -> file:/foo.invalid.com:8080/tmp/testing
    ```
    Please also see more details from JIRA 
[ticket](https://issues.apache.org/jira/browse/SPARK-44272).
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add additional UT to validate the FileStatus is cached as expected.
    
    Closes #41821 from shuwang21/fixcache.
    
    Lead-authored-by: Shu Wang <[email protected]>
    Co-authored-by: Shu Wang <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 0879a25c4c271dea6cd8f2a45e5c5e6e6743a962)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../yarn/ClientDistributedCacheManager.scala       | 41 +++++++++++-----
 .../yarn/ClientDistributedCacheManagerSuite.scala  | 55 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 13 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 8add7442888..6d50b5e4fd2 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -68,7 +68,7 @@ private[spark] class ClientDistributedCacheManager() extends 
Logging {
       link: String,
       statCache: Map[URI, FileStatus],
       appMasterOnly: Boolean = false): Unit = {
-    val destStatus = statCache.getOrElse(destPath.toUri(), 
fs.getFileStatus(destPath))
+    val destStatus = getFileStatus(fs, destPath.toUri, statCache)
     val amJarRsrc = Records.newRecord(classOf[LocalResource])
     amJarRsrc.setType(resourceType)
     val visibility = getVisibility(conf, destPath.toUri(), statCache)
@@ -119,46 +119,61 @@ private[spark] class ClientDistributedCacheManager() 
extends Logging {
    */
   private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, 
FileStatus]): Boolean = {
     val fs = FileSystem.get(uri, conf)
-    val current = new Path(uri.getPath())
     // the leaf level file should be readable by others
-    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+    if (!checkPermissionOfOther(fs, uri, FsAction.READ, statCache)) {
       return false
     }
-    ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+    ancestorsHaveExecutePermissions(fs, getParentURI(uri), statCache)
   }
 
   /**
-   * Returns true if all ancestors of the specified path have the 'execute'
+   * Get the Parent URI of the given URI. Notes that the query & fragment of 
original URI will not
+   * be inherited when obtaining parent URI.
+   *
+   * @return the parent URI, null if the given uri is the root
+   */
+  private[yarn] def getParentURI(uri: URI): URI = {
+    val path = new Path(uri.toString)
+    val parent = path.getParent()
+    if (parent == null) {
+      null
+    } else {
+      parent.toUri()
+    }
+  }
+
+  /**
+   * Returns true if all ancestors of the specified uri have the 'execute'
    * permission set for all users (i.e. that other users can traverse
-   * the directory hierarchy to the given path)
+   * the directory hierarchy to the given uri)
    * @return true if all ancestors have the 'execute' permission set for all 
users
    */
   private def ancestorsHaveExecutePermissions(
       fs: FileSystem,
-      path: Path,
+      uri: URI,
       statCache: Map[URI, FileStatus]): Boolean = {
-    var current = path
+    var current = uri
     while (current != null) {
-      // the subdirs in the path should have execute permissions for others
+      // the subdirs in the corresponding uri path should have execute 
permissions for others
       if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
         return false
       }
-      current = current.getParent()
+      current = getParentURI(current)
     }
     true
   }
 
   /**
-   * Checks for a given path whether the Other permissions on it
+   * Checks for a given URI whether the Other permissions on it
    * imply the permission in the passed FsAction
    * @return true if the path in the uri is visible to all, false otherwise
    */
   private def checkPermissionOfOther(
       fs: FileSystem,
-      path: Path,
+      uri: URI,
       action: FsAction,
       statCache: Map[URI, FileStatus]): Boolean = {
-    val status = getFileStatus(fs, path.toUri(), statCache)
+    val status = getFileStatus(fs, uri, statCache)
     val perms = status.getPermission()
     val otherAction = perms.getOtherAction()
     otherAction.implies(action)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 996654f7415..4e8971cbfa0 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import java.net.URI
 
+import scala.collection.mutable
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.Map
 
@@ -44,6 +45,60 @@ class ClientDistributedCacheManagerSuite extends 
SparkFunSuite with MockitoSugar
     }
   }
 
+  test("SPARK-44272: test addResource added FileStatus to statCache and 
getVisibility can read" +
+    " from statCache") {
+    val distMgr = new ClientDistributedCacheManager() {
+      override private[yarn] def getFileStatus(fs: FileSystem, uri: URI,
+        statCache: mutable.Map[URI, FileStatus]): FileStatus = {
+        statCache.getOrElseUpdate(uri, new FileStatus())
+      }
+    }
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPathA = new Path("file:///foo.invalid.com:8080/tmp/A")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    distMgr.addResource(fs, conf, destPathA, localResources, 
LocalResourceType.FILE, "link",
+      statCache, false)
+    assert(statCache.size === 2)
+    assert(statCache.contains(destPathA.toUri))
+    assert(statCache.contains(destPathA.getParent.toUri))
+
+    val destPathB = new Path("file:///foo.invalid.com:8080/tmp/B")
+    distMgr.addResource(fs, conf, destPathB, localResources, 
LocalResourceType.FILE, "link",
+      statCache, false)
+    assert(statCache.size === 3)
+    assert(statCache.contains(destPathB.toUri))
+
+    val destPathC = new Path("file:///foo.invalid.com:8080/root/C")
+    distMgr.addResource(fs, conf, destPathC, localResources, 
LocalResourceType.FILE, "link",
+      statCache, false)
+    assert(statCache.size === 5)
+    assert(statCache.contains(destPathC.toUri))
+    assert(statCache.contains(destPathC.getParent.toUri))
+  }
+
+  test("SPARK-44272: test getParentURI") {
+    val distMgr = new ClientDistributedCacheManager()
+    val scheme = "file"
+    val userInfo = "user"
+    val host = "foo.com"
+    val port = 8080
+    val path = "/tmp/testing"
+    val uri = new URI(scheme, userInfo, host, port, path, null, null)
+    val parentURI = distMgr.getParentURI(uri)
+    assert(uri.getScheme === parentURI.getScheme)
+    assert(uri.getUserInfo === parentURI.getUserInfo)
+    assert(uri.getHost === parentURI.getHost)
+    assert(uri.getPort === parentURI.getPort)
+    assert(new Path(uri.getPath).getParent.toString === parentURI.getPath)
+
+    val rootPath = "/"
+    val parentRootURI = distMgr.getParentURI(
+      new URI(scheme, userInfo, host, port, rootPath, null, null))
+    assert(parentRootURI === null)
+  }
+
   test("test getFileStatus empty") {
     val distMgr = new ClientDistributedCacheManager()
     val fs = mock[FileSystem]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to