Repository: spark Updated Branches: refs/heads/master 098e4ca9c -> 67659c9af
[SPARK-18212][SS][KAFKA] increase executor poll timeout ## What changes were proposed in this pull request? Increase poll timeout to try and address flaky test ## How was this patch tested? Ran existing unit tests Author: cody koeninger <[email protected]> Closes #15737 from koeninger/SPARK-18212. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67659c9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67659c9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67659c9a Branch: refs/heads/master Commit: 67659c9afaeb2289e56fd87fafee953e8f050383 Parents: 098e4ca Author: cody koeninger <[email protected]> Authored: Thu Nov 3 14:43:25 2016 -0700 Committer: Michael Armbrust <[email protected]> Committed: Thu Nov 3 14:43:25 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 5 ++++- .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/67659c9a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 61cba73..b21508c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource( private val sc = sqlContext.sparkContext - private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + private val pollTimeoutMs = sourceOptions.getOrElse( + "kafkaConsumer.pollTimeoutMs", + sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong private val maxOffsetFetchAttempts = sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt http://git-wip-us.apache.org/repos/asf/spark/blob/67659c9a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 5b5a9ac..9839425 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V]( " must be set to false for executor kafka params, else offsets may commit before processing") // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? - private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512) + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", + conf.getTimeAsMs("spark.network.timeout", "120s")) private val cacheInitialCapacity = conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) private val cacheMaxCapacity = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
