Repository: camel Updated Branches: refs/heads/master a0ae974f9 -> 95488a5a6
CAMEL-8636 Committed the last batch of message when the auto commit is false Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/95488a5a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/95488a5a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/95488a5a Branch: refs/heads/master Commit: 95488a5a65f7dcb03a87adfe605f27e3ec93f736 Parents: a0ae974 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Apr 15 14:39:41 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Apr 15 18:57:04 2015 +0800 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/95488a5a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index d6b49d2..46d258d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -136,29 +136,31 @@ public class KafkaConsumer extends DefaultConsumer { boolean consumerTimeout; MessageAndMetadata<byte[], byte[]> mm = null; ConsumerIterator<byte[], byte[]> it = stream.iterator(); - - while (true) { + boolean hasNext = true; + while (hasNext) { try { consumerTimeout = false; if (it.hasNext()) { mm = it.next(); + Exchange exchange = endpoint.createKafkaExchange(mm); + try { + processor.process(exchange); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + processed++; } else { - break; - } - Exchange exchange = endpoint.createKafkaExchange(mm); - try { - processor.process(exchange); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + // we don't need to process the message + hasNext = false; } - processed++; } catch (ConsumerTimeoutException e) { LOG.debug(e.getMessage(), e); consumerTimeout = true; } - if (processed >= endpoint.getBatchSize() || consumerTimeout) { + if (processed >= endpoint.getBatchSize() || consumerTimeout + || (processed > 0 && !hasNext)) { // Need to commit the offset for the last round try { berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS); if (!consumerTimeout) {