This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4a5f695 [SPARK-30541][TESTS] Implement KafkaDelegationTokenSuite with
testRetry
4a5f695 is described below
commit 4a5f6955ba9667298bf13e5cdaa7703d5389b083
Author: Gabor Somogyi <[email protected]>
AuthorDate: Sat Mar 21 18:59:29 2020 -0700
[SPARK-30541][TESTS] Implement KafkaDelegationTokenSuite with testRetry
### What changes were proposed in this pull request?
`KafkaDelegationTokenSuite` has been ignored because showed flaky
behaviour. In this PR I've changed the approach how the test executed and
turning it on again. This PR contains the following:
* The test runs in separate JVM in order to avoid modified security context
* The body of the test runs in `testRetry` which reties if failed
* Additional logs to analyse possible failures
* Enhanced clean-up code
### Why are the changes needed?
`KafkaDelegationTokenSuite ` is ignored.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Executed the test in loop 1k+ times in jenkins (locally much harder to
reproduce).
Closes #27877 from gaborgsomogyi/SPARK-30541.
Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit bf342bafa81738a47d511d3aa02812a1ccc0ecab)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
external/kafka-0-10-sql/src/test/resources/log4j.properties | 4 +++-
.../org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala | 2 +-
.../src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 ++++
project/SparkBuild.scala | 3 ++-
4 files changed, 10 insertions(+), 3 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/resources/log4j.properties
b/external/kafka-0-10-sql/src/test/resources/log4j.properties
index 75e3b53..daf0572 100644
--- a/external/kafka-0-10-sql/src/test/resources/log4j.properties
+++ b/external/kafka-0-10-sql/src/test/resources/log4j.properties
@@ -25,4 +25,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss.SSS} %t %p %c{
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
-
+log4j.logger.org.apache.spark.sql.kafka010.KafkaTestUtils=DEBUG
+log4j.logger.org.apache.directory.server.kerberos.kdc.authentication=DEBUG
+log4j.logger.org.apache.directory.server.core.DefaultDirectoryService=DEBUG
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
index 79239e5..702bd4f 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala
@@ -62,7 +62,7 @@ class KafkaDelegationTokenSuite extends StreamTest with
SharedSparkSession with
}
}
- ignore("Roundtrip") {
+ testRetry("Roundtrip", 3) {
val hadoopConf = new Configuration()
val manager = new HadoopDelegationTokenManager(spark.sparkContext.conf,
hadoopConf, null)
val credentials = new Credentials()
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7b972fe..c1ca557 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -170,6 +170,7 @@ class KafkaTestUtils(
kdc.getKrb5conf.delete()
Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8)
+ logDebug(s"krb5.conf file content: $krb5confStr")
}
private def addedKrb5Config(key: String, value: String): String = {
@@ -309,6 +310,7 @@ class KafkaTestUtils(
}
brokerReady = false
zkReady = false
+ kdcReady = false
if (producer != null) {
producer.close()
@@ -317,6 +319,7 @@ class KafkaTestUtils(
if (adminClient != null) {
adminClient.close()
+ adminClient = null
}
if (server != null) {
@@ -351,6 +354,7 @@ class KafkaTestUtils(
Configuration.getConfiguration.refresh()
if (kdc != null) {
kdc.stop()
+ kdc = null
}
UserGroupInformation.reset()
teardownKrbDebug()
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1a2a7c3..3889013 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -479,7 +479,8 @@ object SparkParallelTestGrouping {
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
- "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite"
+ "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
+ "org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
)
private val DEFAULT_TEST_GROUP = "default_test_group"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]