Repository: spark
Updated Branches:
  refs/heads/master 182da81e9 -> 123f0041d


[SPARK-25291][K8S] Fixing Flakiness of Executor Pod tests

## What changes were proposed in this pull request?

Added fix to flakiness that was present in PySpark tests w.r.t Executors not 
being tested.

Important fix to executorConf which was failing tests when executors *were* 
tested

## How was this patch tested?

Unit and Integration tests

Closes #22415 from ifilonenko/SPARK-25291.

Authored-by: Ilan Filonenko <[email protected]>
Signed-off-by: Yinan Li <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/123f0041
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/123f0041
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/123f0041

Branch: refs/heads/master
Commit: 123f0041d534f28e14343aafb4e5cec19dde14ad
Parents: 182da81
Author: Ilan Filonenko <[email protected]>
Authored: Tue Sep 18 11:43:35 2018 -0700
Committer: Yinan Li <[email protected]>
Committed: Tue Sep 18 11:43:35 2018 -0700

----------------------------------------------------------------------
 .../k8s/integrationtest/KubernetesSuite.scala   | 35 ++++++++++++++------
 .../KubernetesTestComponents.scala              |  1 -
 .../k8s/integrationtest/SecretsTestsSuite.scala |  3 +-
 3 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/123f0041/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 18541ba..c99a907 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -23,7 +23,10 @@ import java.util.regex.Pattern
 
 import com.google.common.io.PatternFilenameFilter
 import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
+import org.scalatest.Matchers
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.time.{Minutes, Seconds, Span}
 import scala.collection.JavaConverters._
@@ -31,10 +34,12 @@ import scala.collection.JavaConverters._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s.integrationtest.TestConfig._
 import 
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, 
IntegrationTestBackendFactory}
+import org.apache.spark.internal.Logging
 
 private[spark] class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with 
SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite {
+  with PythonTestsSuite with ClientModeTestsSuite
+  with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
 
@@ -223,17 +228,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       .getItems
       .get(0)
     driverPodChecker(driverPod)
-
-    val executorPods = kubernetesTestComponents.kubernetesClient
+    val execPods = scala.collection.mutable.Map[String, Pod]()
+    val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "executor")
-      .list()
-      .getItems
-    executorPods.asScala.foreach { pod =>
-      executorPodChecker(pod)
-    }
-
+      .watch(new Watcher[Pod] {
+        logInfo("Beginning watch of executors")
+        override def onClose(cause: KubernetesClientException): Unit =
+          logInfo("Ending watch of executors")
+        override def eventReceived(action: Watcher.Action, resource: Pod): 
Unit = {
+          val name = resource.getMetadata.getName
+          action match {
+            case Action.ADDED | Action.MODIFIED =>
+              execPods(name) = resource
+            case Action.DELETED | Action.ERROR =>
+              execPods.remove(name)
+          }
+        }
+      })
+    Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should 
be (true) }
+    execWatcher.close()
+    execPods.values.foreach(executorPodChecker(_))
     Eventually.eventually(TIMEOUT, INTERVAL) {
       expectedLogOnCompletion.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
@@ -244,7 +260,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite
       }
     }
   }
-
   protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
     assert(driverPod.getMetadata.getName === driverPodName)
     assert(driverPod.getSpec.getContainers.get(0).getImage === image)

http://git-wip-us.apache.org/repos/asf/spark/blob/123f0041/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index b602fdf..5615d61 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -62,7 +62,6 @@ private[spark] class KubernetesTestComponents(defaultClient: 
DefaultKubernetesCl
     new SparkAppConf()
       .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}")
       .set("spark.kubernetes.namespace", namespace)
-      .set("spark.executor.memory", "500m")
       .set("spark.executor.cores", "1")
       .set("spark.executors.instances", "1")
       .set("spark.app.name", "spark-test-app")

http://git-wip-us.apache.org/repos/asf/spark/blob/123f0041/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index 9b039bb..b18a6ae 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder}
+import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
 import org.apache.commons.codec.binary.Base64
 import org.apache.commons.io.output.ByteArrayOutputStream
 import org.scalatest.concurrent.Eventually
@@ -53,7 +53,6 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .delete()
   }
 
-  // TODO: [SPARK-25291] This test is flaky with regards to memory of executors
   test("Run SparkPi with env and mount secrets.", k8sTestTag) {
     createTestSecret()
     sparkAppConf


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to