This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 3c07a3e7e5d [SPARK-44272][YARN] Path Inconsistency when Operating
statCache within Yarn Client
3c07a3e7e5d is described below
commit 3c07a3e7e5dba165f2c579bb4969405bc9d4f20b
Author: Shu Wang <[email protected]>
AuthorDate: Wed Jul 19 02:18:59 2023 -0500
[SPARK-44272][YARN] Path Inconsistency when Operating statCache within Yarn
Client
### What changes were proposed in this pull request?
1. Change `statCache.getOrElse` to `statCache.getOrElseUpdate` so that the
corresponding FileStatus can be cached into `statCache`
2. Change the `Path` parameter `isPublic`, `checkPermissionOfOther`, and
`ancestorsHaveExecutePermissions` to `URI`.
3. Add `getParentURI` method when we construct the parent URI.
### Why are the changes needed?
We should not use `uri.getPath()` when constructing the Path which will not
retain information like scheme. This means that `statCache` is not really
taking any effect.
For example, if uri is "file:/foo.invalid.com:8080/tmp/testing", then
```
uri.getPath -> /foo.invalid.com:8080/tmp/testing
uri.toString -> file:/foo.invalid.com:8080/tmp/testing
```
Please also see more details from JIRA
[ticket](https://issues.apache.org/jira/browse/SPARK-44272).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add additional UT to validate the FileStatus is cached as expected.
Closes #41821 from shuwang21/fixcache.
Lead-authored-by: Shu Wang <[email protected]>
Co-authored-by: Shu Wang <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0879a25c4c271dea6cd8f2a45e5c5e6e6743a962)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../yarn/ClientDistributedCacheManager.scala | 41 +++++++++++-----
.../yarn/ClientDistributedCacheManagerSuite.scala | 55 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 13 deletions(-)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 8add7442888..6d50b5e4fd2 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -68,7 +68,7 @@ private[spark] class ClientDistributedCacheManager() extends
Logging {
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false): Unit = {
- val destStatus = statCache.getOrElse(destPath.toUri(),
fs.getFileStatus(destPath))
+ val destStatus = getFileStatus(fs, destPath.toUri, statCache)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(resourceType)
val visibility = getVisibility(conf, destPath.toUri(), statCache)
@@ -119,46 +119,61 @@ private[spark] class ClientDistributedCacheManager()
extends Logging {
*/
private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI,
FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
- val current = new Path(uri.getPath())
// the leaf level file should be readable by others
- if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+ if (!checkPermissionOfOther(fs, uri, FsAction.READ, statCache)) {
return false
}
- ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+ ancestorsHaveExecutePermissions(fs, getParentURI(uri), statCache)
}
/**
- * Returns true if all ancestors of the specified path have the 'execute'
+ * Get the Parent URI of the given URI. Notes that the query & fragment of
original URI will not
+ * be inherited when obtaining parent URI.
+ *
+ * @return the parent URI, null if the given uri is the root
+ */
+ private[yarn] def getParentURI(uri: URI): URI = {
+ val path = new Path(uri.toString)
+ val parent = path.getParent()
+ if (parent == null) {
+ null
+ } else {
+ parent.toUri()
+ }
+ }
+
+ /**
+ * Returns true if all ancestors of the specified uri have the 'execute'
* permission set for all users (i.e. that other users can traverse
- * the directory hierarchy to the given path)
+ * the directory hierarchy to the given uri)
* @return true if all ancestors have the 'execute' permission set for all
users
*/
private def ancestorsHaveExecutePermissions(
fs: FileSystem,
- path: Path,
+ uri: URI,
statCache: Map[URI, FileStatus]): Boolean = {
- var current = path
+ var current = uri
while (current != null) {
- // the subdirs in the path should have execute permissions for others
+ // the subdirs in the corresponding uri path should have execute
permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false
}
- current = current.getParent()
+ current = getParentURI(current)
}
true
}
/**
- * Checks for a given path whether the Other permissions on it
+ * Checks for a given URI whether the Other permissions on it
* imply the permission in the passed FsAction
* @return true if the path in the uri is visible to all, false otherwise
*/
private def checkPermissionOfOther(
fs: FileSystem,
- path: Path,
+ uri: URI,
action: FsAction,
statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache)
+ val status = getFileStatus(fs, uri, statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
otherAction.implies(action)
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 996654f7415..4e8971cbfa0 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
import java.net.URI
+import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
@@ -44,6 +45,60 @@ class ClientDistributedCacheManagerSuite extends
SparkFunSuite with MockitoSugar
}
}
+ test("SPARK-44272: test addResource added FileStatus to statCache and
getVisibility can read" +
+ " from statCache") {
+ val distMgr = new ClientDistributedCacheManager() {
+ override private[yarn] def getFileStatus(fs: FileSystem, uri: URI,
+ statCache: mutable.Map[URI, FileStatus]): FileStatus = {
+ statCache.getOrElseUpdate(uri, new FileStatus())
+ }
+ }
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPathA = new Path("file:///foo.invalid.com:8080/tmp/A")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ distMgr.addResource(fs, conf, destPathA, localResources,
LocalResourceType.FILE, "link",
+ statCache, false)
+ assert(statCache.size === 2)
+ assert(statCache.contains(destPathA.toUri))
+ assert(statCache.contains(destPathA.getParent.toUri))
+
+ val destPathB = new Path("file:///foo.invalid.com:8080/tmp/B")
+ distMgr.addResource(fs, conf, destPathB, localResources,
LocalResourceType.FILE, "link",
+ statCache, false)
+ assert(statCache.size === 3)
+ assert(statCache.contains(destPathB.toUri))
+
+ val destPathC = new Path("file:///foo.invalid.com:8080/root/C")
+ distMgr.addResource(fs, conf, destPathC, localResources,
LocalResourceType.FILE, "link",
+ statCache, false)
+ assert(statCache.size === 5)
+ assert(statCache.contains(destPathC.toUri))
+ assert(statCache.contains(destPathC.getParent.toUri))
+ }
+
+ test("SPARK-44272: test getParentURI") {
+ val distMgr = new ClientDistributedCacheManager()
+ val scheme = "file"
+ val userInfo = "user"
+ val host = "foo.com"
+ val port = 8080
+ val path = "/tmp/testing"
+ val uri = new URI(scheme, userInfo, host, port, path, null, null)
+ val parentURI = distMgr.getParentURI(uri)
+ assert(uri.getScheme === parentURI.getScheme)
+ assert(uri.getUserInfo === parentURI.getUserInfo)
+ assert(uri.getHost === parentURI.getHost)
+ assert(uri.getPort === parentURI.getPort)
+ assert(new Path(uri.getPath).getParent.toString === parentURI.getPath)
+
+ val rootPath = "/"
+ val parentRootURI = distMgr.getParentURI(
+ new URI(scheme, userInfo, host, port, rootPath, null, null))
+ assert(parentRootURI === null)
+ }
+
test("test getFileStatus empty") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]