This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 68fb5c423a8 [SPARK-40817][K8S][3.2] `spark.files` should preserve 
remote files
68fb5c423a8 is described below

commit 68fb5c423a819489a67f11ace32c646e3cf70a19
Author: Anton Ippolitov <[email protected]>
AuthorDate: Fri Jan 20 14:43:42 2023 -0800

    [SPARK-40817][K8S][3.2] `spark.files` should preserve remote files
    
    ### What changes were proposed in this pull request?
    Backport https://github.com/apache/spark/pull/38376 to `branch-3.2`
    
    You can find a detailed description of the issue and an example 
reproduction on the Jira card: https://issues.apache.org/jira/browse/SPARK-40817
    
    The idea for this fix is to update the logic which uploads user-specified 
files (via `spark.jars`, `spark.files`, etc) to 
`spark.kubernetes.file.upload.path`. After uploading local files, it used to 
overwrite the initial list of URIs passed by the user and it would thus erase 
all remote URIs which were specified there.
    
    Small example of this behaviour:
    1. User set the value of `spark.jars` to 
`s3a://some-bucket/my-application.jar,/tmp/some-local-jar.jar` when running 
`spark-submit` in cluster mode
    2. `BasicDriverFeatureStep.getAdditionalPodSystemProperties()` gets called 
at one point while running `spark-submit`
    3. This function would set `spark.jars` to a new value of 
`${SPARK_KUBERNETES_UPLOAD_PATH}/spark-upload-${RANDOM_STRING}/some-local-jar.jar`.
 Note that `s3a://some-bucket/my-application.jar` has been discarded.
    
    With the logic proposed in this PR, the new value of `spark.jars` would be 
`s3a://some-bucket/my-application.jar,${SPARK_KUBERNETES_UPLOAD_PATH}/spark-upload-${RANDOM_STRING}/some-local-jar.jar`,
 so in other words we are making sure that remote URIs are no longer discarded.
    
    ### Why are the changes needed?
    We encountered this issue in production when trying to launch Spark on 
Kubernetes jobs in cluster mode with a fix of local and remote dependencies.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, see description of the new behaviour above.
    
    ### How was this patch tested?
    - Added a unit test for the new behaviour
    - Added an integration test for the new behaviour
    - Tried this patch in our Kubernetes environment with `SparkPi`:
    ```
    spark-submit \
      --master k8s://https://$KUBERNETES_API_SERVER_URL:443 \
      --deploy-mode cluster \
      --name=spark-submit-test \
      --class org.apache.spark.examples.SparkPi \
      --conf 
spark.jars=/opt/my-local-jar.jar,s3a://$BUCKET_NAME/my-remote-jar.jar \
      --conf 
spark.kubernetes.file.upload.path=s3a://$BUCKET_NAME/my-upload-path/ \
      [...]
      /opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar
    ```
    Before applying the patch, `s3a://$BUCKET_NAME/my-remote-jar.jar` was 
discarded from the final value of `spark.jars`. After applying the patch and 
launching the job again, I confirmed that 
`s3a://$BUCKET_NAME/my-remote-jar.jar` was no longer discarded by looking at 
the Spark config for the running job.
    
    Closes #39670 from antonipp/spark-40817-branch-3.2.
    
    Authored-by: Anton Ippolitov <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../k8s/features/BasicDriverFeatureStep.scala      | 12 ++--
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 41 +++++++++++++
 .../k8s/integrationtest/DepsTestsSuite.scala       | 67 ++++++++++++++++++++--
 3 files changed, 110 insertions(+), 10 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 7f34f30d599..d8a3e960d82 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -162,27 +162,27 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
     // try upload local, resolvable files to a hadoop compatible file system
     Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
-      val uris = conf.get(key).filter(uri => 
KubernetesUtils.isLocalAndResolvable(uri))
+      val (localUris, remoteUris) =
+        conf.get(key).partition(uri => 
KubernetesUtils.isLocalAndResolvable(uri))
       val value = {
         if (key == ARCHIVES) {
-          
uris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString)
+          
localUris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString)
         } else {
-          uris
+          localUris
         }
       }
       val resolved = KubernetesUtils.uploadAndTransformFileUris(value, 
Some(conf.sparkConf))
       if (resolved.nonEmpty) {
         val resolvedValue = if (key == ARCHIVES) {
-          uris.zip(resolved).map { case (uri, r) =>
+          localUris.zip(resolved).map { case (uri, r) =>
             UriBuilder.fromUri(r).fragment(new 
java.net.URI(uri).getFragment).build().toString
           }
         } else {
           resolved
         }
-        additionalProps.put(key.key, resolvedValue.mkString(","))
+        additionalProps.put(key.key, (resolvedValue ++ 
remoteUris).mkString(","))
       }
     }
     additionalProps.toMap
   }
 }
-
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 4e1747a8c02..9384daf5e85 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, 
LocalObjectReferenceBuilder, Quantity}
+import org.apache.hadoop.fs.{LocalFileSystem, Path}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
@@ -232,6 +233,33 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     assert(portMap2(BLOCK_MANAGER_PORT_NAME) === 1235)
   }
 
+  test("SPARK-40817: Check that remote JARs do not get discarded in 
spark.jars") {
+    val FILE_UPLOAD_PATH = "s3a://some-bucket/upload-path"
+    val REMOTE_JAR_URI = "s3a://some-bucket/my-application.jar"
+    val LOCAL_JAR_URI = "/tmp/some-local-jar.jar"
+
+    val sparkConf = new SparkConf()
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(JARS, Seq(REMOTE_JAR_URI, LOCAL_JAR_URI))
+      .set(KUBERNETES_FILE_UPLOAD_PATH, FILE_UPLOAD_PATH)
+      // Instead of using the real S3A Hadoop driver, use a fake local one
+      .set("spark.hadoop.fs.s3a.impl", 
classOf[TestFileSystem].getCanonicalName)
+      .set("spark.hadoop.fs.s3a.impl.disable.cache", "true")
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
+    val featureStep = new BasicDriverFeatureStep(kubernetesConf)
+
+    val sparkJars = 
featureStep.getAdditionalPodSystemProperties()(JARS.key).split(",")
+
+    // Both the remote and the local JAR should be there
+    assert(sparkJars.size == 2)
+    // The remote JAR path should have been left untouched
+    assert(sparkJars.contains(REMOTE_JAR_URI))
+    // The local JAR should have been uploaded to 
spark.kubernetes.file.upload.path
+    assert(!sparkJars.contains(LOCAL_JAR_URI))
+    assert(sparkJars.exists(path =>
+      path.startsWith(FILE_UPLOAD_PATH) && 
path.endsWith("some-local-jar.jar")))
+  }
+
   def containerPort(name: String, portNumber: Int): ContainerPort =
     new ContainerPortBuilder()
       .withName(name)
@@ -241,3 +269,16 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
   private def amountAndFormat(quantity: Quantity): String = quantity.getAmount 
+ quantity.getFormat
 }
+
+/**
+ * No-op Hadoop FileSystem
+ */
+private class TestFileSystem extends LocalFileSystem {
+  override def copyFromLocalFile(
+    delSrc: Boolean,
+    overwrite: Boolean,
+    src: Path,
+    dst: Path): Unit = {}
+
+  override def mkdirs(path: Path): Boolean = true
+}
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index 3f3c4ef1460..663f8147c7a 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -32,7 +32,7 @@ import org.scalatest.time.{Minutes, Span}
 
 import org.apache.spark.SparkException
 import 
org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, 
FILE_CONTENTS, HOST_PATH}
-import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, 
MinikubeTag, TIMEOUT}
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, 
MinikubeTag, SPARK_PI_MAIN_CLASS, TIMEOUT}
 import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName
 import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
 import org.apache.spark.internal.config.{ARCHIVES, PYSPARK_DRIVER_PYTHON, 
PYSPARK_PYTHON}
@@ -167,6 +167,42 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     })
   }
 
+  test(
+    "SPARK-40817: Check that remote files do not get discarded in spark.files",
+    k8sTestTag,
+    MinikubeTag) {
+    tryDepsTest({
+      // Create a local file
+      val localFileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
+
+      // Create a remote file on S3
+      val remoteFileName = "some-remote-file.txt"
+      val remoteFileKey = s"some-path/${remoteFileName}"
+      createS3Object(remoteFileKey, "Some Content")
+      val remoteFileFullPath = s"s3a://${BUCKET}/${remoteFileKey}"
+
+      // Put both file paths in spark.files
+      sparkAppConf.set("spark.files", 
s"$HOST_PATH/$localFileName,${remoteFileFullPath}")
+      // Allows to properly read executor logs once the job is finished
+      sparkAppConf.set("spark.kubernetes.executor.deleteOnTermination", 
"false")
+
+      // Run SparkPi and make sure that both files have been properly 
downloaded on running pods
+      val examplesJar = Utils.getTestFileAbsolutePath(getExamplesJarName(), 
sparkHomeDir)
+      runSparkApplicationAndVerifyCompletion(
+        appResource = examplesJar,
+        mainClass = SPARK_PI_MAIN_CLASS,
+        appArgs = Array(),
+        expectedDriverLogOnCompletion = Seq("Pi is roughly 3"),
+        // We can check whether the Executor pod has successfully
+        // downloaded both the local and the remote file
+        expectedExecutorLogOnCompletion = Seq(localFileName, remoteFileName),
+        driverPodChecker = doBasicDriverPodCheck,
+        executorPodChecker = doBasicExecutorPodCheck,
+        isJVM = true
+      )
+    })
+  }
+
   test("SPARK-33615: Launcher client archives", k8sTestTag, MinikubeTag) {
     tryDepsTest {
       val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
@@ -259,12 +295,20 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
   }
 
+  private def getS3Client(
+      endPoint: String,
+      accessKey: String = ACCESS_KEY,
+      secretKey: String = SECRET_KEY): AmazonS3Client = {
+    val credentials = new BasicAWSCredentials(accessKey, secretKey)
+    val s3client = new AmazonS3Client(credentials)
+    s3client.setEndpoint(endPoint)
+    s3client
+  }
+
   private def createS3Bucket(accessKey: String, secretKey: String, endPoint: 
String): Unit = {
     Eventually.eventually(TIMEOUT, INTERVAL) {
       try {
-        val credentials = new BasicAWSCredentials(accessKey, secretKey)
-        val s3client = new AmazonS3Client(credentials)
-        s3client.setEndpoint(endPoint)
+        val s3client = getS3Client(endPoint, accessKey, secretKey)
         s3client.createBucket(BUCKET)
       } catch {
         case e: Exception =>
@@ -273,6 +317,21 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
   }
 
+  private def createS3Object(
+      objectKey: String,
+      objectContent: String,
+      endPoint: String = getServiceUrl(svcName)): Unit = {
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      try {
+        val s3client = getS3Client(endPoint)
+        s3client.putObject(BUCKET, objectKey, objectContent)
+      } catch {
+        case e: Exception =>
+          throw new SparkException(s"Failed to create object 
$BUCKET/$objectKey.", e)
+      }
+    }
+  }
+
   private def getServiceUrl(serviceName: String): String = {
     val fuzzyUrlMatcher = """^(.*?)([a-zA-Z]+://.*?)(\s*)$""".r
     Eventually.eventually(TIMEOUT, INTERVAL) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to