This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 523818a [SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for
Kafka async commit
523818a is described below
commit 523818a8ceda2b82e06089ee657e0bd92c4603e5
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Jul 10 09:35:39 2019 -0700
[SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for Kafka async
commit
`DirectKafkaStreamSuite.offset recovery from kafka` commits offsets to
Kafka with `Consumer.commitAsync` API (and then reads it back). Since this API
is asynchronous it may send notifications late(or not at all). The actual test
makes the assumption if the data sent and collected then the offset must be
committed as well. This is not true.
In this PR I've made the following modifications:
* Wait for async offset commit before context stopped
* Added commit succeed log to see whether it arrived at all
* Using `ConcurrentHashMap` for committed offsets because 2 threads are
using the variable (`JobGenerator` and `ScalaTest...`)
Existing unit test in a loop + jenkins runs.
Closes #25100 from gaborgsomogyi/SPARK-28335.
Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 579edf472822802285b5cd7d07f63503015eff5a)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 453b5e5..375409c 100644
---
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010
import java.io.File
import java.lang.{ Long => JLong }
import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
@@ -423,7 +424,7 @@ class DirectKafkaStreamSuite
)
val collectedData = new ConcurrentLinkedQueue[String]()
- val committed = new JHashMap[TopicPartition, OffsetAndMetadata]()
+ val committed = new ConcurrentHashMap[TopicPartition, OffsetAndMetadata]()
// Send data to Kafka and wait for it to be received
def sendDataAndWaitForReceive(data: Seq[Int]) {
@@ -452,6 +453,7 @@ class DirectKafkaStreamSuite
logError("commit failed", e)
} else {
committed.putAll(m)
+ logDebug(s"commit succeeded: $m")
}
}
})
@@ -462,8 +464,10 @@ class DirectKafkaStreamSuite
for (i <- (1 to 10).grouped(4)) {
sendDataAndWaitForReceive(i)
}
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
+ assert(!committed.isEmpty)
+ }
ssc.stop()
- assert(! committed.isEmpty)
val consumer = new KafkaConsumer[String, String](kafkaParams)
consumer.subscribe(Arrays.asList(topic))
consumer.poll(0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]