Repository: spark Updated Branches: refs/heads/master cd662bc7a -> cd106b050
[SPARK-17841][STREAMING][KAFKA] drain commitQueue ## What changes were proposed in this pull request? Actually drain commit queue rather than just iterating it. iterator() on a concurrent linked queue won't remove items from the queue, poll() will. ## How was this patch tested? Unit tests Author: cody koeninger <[email protected]> Closes #15407 from koeninger/SPARK-17841. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd106b05 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd106b05 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd106b05 Branch: refs/heads/master Commit: cd106b050ff789b6de539956a7f01159ab15c820 Parents: cd662bc Author: cody koeninger <[email protected]> Authored: Tue Oct 18 14:01:49 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Oct 18 14:01:49 2016 -0700 ---------------------------------------------------------------------- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cd106b05/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 432537e..7e57bb1 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected def commitAll(): Unit = { val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() - val it = commitQueue.iterator() - while (it.hasNext) { - val osr = it.next + var osr = commitQueue.poll() + while (null != osr) { val tp = osr.topicPartition val x = m.get(tp) val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } m.put(tp, new OffsetAndMetadata(offset)) + osr = commitQueue.poll() } if (!m.isEmpty) { consumer.commitAsync(m, commitCallback.get) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
