This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4080c4b [SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness
4080c4b is described below
commit 4080c4beeb9cb27027145a37799ee8599ee51aab
Author: Holden Karau <[email protected]>
AuthorDate: Fri Sep 20 10:08:16 2019 -0700
[SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness
### What changes were proposed in this pull request?
Switch from using a Thread sleep for waiting for commands to finish to just
waiting for the command to finish with a watcher & improve the error messages
in the SecretsTestsSuite.
### Why are the changes needed?
Currently some of the Spark Kubernetes tests have race conditions with
command execution, and the frequent use of eventually makes debugging test
failures difficult.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests pass after removal of thread.sleep
Closes #25765 from
holdenk/SPARK-28937SPARK-28936-improve-kubernetes-integration-tests.
Authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
---
.../k8s/integrationtest/SecretsTestsSuite.scala | 51 +++++++++++++++-------
.../spark/deploy/k8s/integrationtest/Utils.scala | 40 +++++++++++++++--
2 files changed, 72 insertions(+), 19 deletions(-)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index cd61ea1..54a9dbf 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
+import java.util.Locale
+
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
@@ -57,11 +59,17 @@ private[spark] trait SecretsTestsSuite { k8sSuite:
KubernetesSuite =>
createTestSecret()
sparkAppConf
.set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME",
SECRET_MOUNT_PATH)
- .set(s"spark.kubernetes.driver.secretKeyRef.USERNAME",
s"$ENV_SECRET_NAME:username")
- .set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD",
s"$ENV_SECRET_NAME:password")
+ .set(
+ s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
+ s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_1}")
+ .set(
+ s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
+ s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_2}")
.set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME",
SECRET_MOUNT_PATH)
- .set(s"spark.kubernetes.executor.secretKeyRef.USERNAME",
s"$ENV_SECRET_NAME:username")
- .set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD",
s"$ENV_SECRET_NAME:password")
+ .set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
+ s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_1")
+ .set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
+ s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_2")
try {
runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
@@ -81,19 +89,30 @@ private[spark] trait SecretsTestsSuite { k8sSuite:
KubernetesSuite =>
}
private def checkSecrets(pod: Pod): Unit = {
- Eventually.eventually(TIMEOUT, INTERVAL) {
- implicit val podName: String = pod.getMetadata.getName
- implicit val components: KubernetesTestComponents =
kubernetesTestComponents
+ logDebug(s"Checking secrets for ${pod}")
+ // Wait for the pod to become ready & have secrets provisioned
+ implicit val podName: String = pod.getMetadata.getName
+ implicit val components: KubernetesTestComponents =
kubernetesTestComponents
+ val env = Eventually.eventually(TIMEOUT, INTERVAL) {
+ logDebug(s"Checking env of ${pod.getMetadata().getName()} ....")
val env = Utils.executeCommand("env")
- assert(env.toString.contains(ENV_SECRET_VALUE_1))
- assert(env.toString.contains(ENV_SECRET_VALUE_2))
- val fileUsernameContents = Utils
- .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
- val filePasswordContents = Utils
- .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
- assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
- assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
+ assert(!env.isEmpty)
+ env
}
+ env.toString should include
(s"${ENV_SECRET_KEY_1_CAP}=$ENV_SECRET_VALUE_1")
+ env.toString should include
(s"${ENV_SECRET_KEY_2_CAP}=$ENV_SECRET_VALUE_2")
+
+ // Make sure our secret files are mounted correctly
+ val files = Utils.executeCommand("ls", s"$SECRET_MOUNT_PATH")
+ files should include (ENV_SECRET_KEY_1)
+ files should include (ENV_SECRET_KEY_2)
+ // Validate the contents
+ val fileUsernameContents = Utils
+ .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
+ fileUsernameContents.toString.trim should equal(ENV_SECRET_VALUE_1)
+ val filePasswordContents = Utils
+ .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
+ filePasswordContents.toString.trim should equal(ENV_SECRET_VALUE_2)
}
}
@@ -102,6 +121,8 @@ private[spark] object SecretsTestsSuite {
val SECRET_MOUNT_PATH = "/etc/secret"
val ENV_SECRET_KEY_1 = "username"
val ENV_SECRET_KEY_2 = "password"
+ val ENV_SECRET_KEY_1_CAP = ENV_SECRET_KEY_1.toUpperCase(Locale.ROOT)
+ val ENV_SECRET_KEY_2_CAP = ENV_SECRET_KEY_2.toUpperCase(Locale.ROOT)
val ENV_SECRET_VALUE_1 = "secretusername"
val ENV_SECRET_VALUE_2 = "secretpassword"
}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index a687a1b..9f85805 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -18,9 +18,12 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.io.{Closeable, File, PrintWriter}
import java.nio.file.{Files, Path}
+import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
+import io.fabric8.kubernetes.client.dsl.ExecListener
+import okhttp3.Response
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.spark.{SPARK_VERSION, SparkException}
@@ -45,20 +48,49 @@ object Utils extends Logging {
implicit podName: String,
kubernetesTestComponents: KubernetesTestComponents): String = {
val out = new ByteArrayOutputStream()
- val watch = kubernetesTestComponents
+ val pod = kubernetesTestComponents
.kubernetesClient
.pods()
.withName(podName)
+ // Avoid timing issues by looking for open/close
+ class ReadyListener extends ExecListener {
+ val openLatch: CountDownLatch = new CountDownLatch(1)
+ val closeLatch: CountDownLatch = new CountDownLatch(1)
+
+ override def onOpen(response: Response) {
+ openLatch.countDown()
+ }
+
+ override def onClose(a: Int, b: String) {
+ closeLatch.countDown()
+ }
+
+ override def onFailure(e: Throwable, r: Response) {
+ }
+
+ def waitForInputStreamToConnect(): Unit = {
+ openLatch.await()
+ }
+
+ def waitForClose(): Unit = {
+ closeLatch.await()
+ }
+ }
+ val listener = new ReadyListener()
+ val watch = pod
.readingInput(System.in)
.writingOutput(out)
.writingError(System.err)
.withTTY()
+ .usingListener(listener)
.exec(cmd.toArray: _*)
- // wait to get some result back
- Thread.sleep(1000)
+ // under load sometimes the stdout isn't connected by the time we try to
read from it.
+ listener.waitForInputStreamToConnect()
+ listener.waitForClose()
watch.close()
out.flush()
- out.toString()
+ val result = out.toString()
+ result
}
def createTempFile(contents: String, hostPath: String): String = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]