Repository: spark Updated Branches: refs/heads/branch-2.0 6ef923137 -> f6b87939c
[SPARK-17841][STREAMING][KAFKA] drain commitQueue ## What changes were proposed in this pull request? Actually drain commit queue rather than just iterating it. iterator() on a concurrent linked queue won't remove items from the queue, poll() will. ## How was this patch tested? Unit tests Author: cody koeninger <[email protected]> Closes #15407 from koeninger/SPARK-17841. (cherry picked from commit cd106b050ff789b6de539956a7f01159ab15c820) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6b87939 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6b87939 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6b87939 Branch: refs/heads/branch-2.0 Commit: f6b87939cb90bf4a0996b3728c1bccdf5e24dd4e Parents: 6ef9231 Author: cody koeninger <[email protected]> Authored: Tue Oct 18 14:01:49 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Oct 18 14:01:59 2016 -0700 ---------------------------------------------------------------------- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f6b87939/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 432537e..7e57bb1 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected def commitAll(): Unit = { val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() - val it = commitQueue.iterator() - while (it.hasNext) { - val osr = it.next + var osr = commitQueue.poll() + while (null != osr) { val tp = osr.topicPartition val x = m.get(tp) val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } m.put(tp, new OffsetAndMetadata(offset)) + osr = commitQueue.poll() } if (!m.isEmpty) { consumer.commitAsync(m, commitCallback.get) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
