CAMEL-8653: camel-kafka consumer should commit offset on consumer when its no longer running such as suspending/shutting down.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/805db35b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/805db35b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/805db35b Branch: refs/heads/master Commit: 805db35b482b0f3b65df80906e1d42dcea1b3c9e Parents: e09c51d Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 7 16:56:36 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 7 16:56:36 2015 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 24 ++++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/805db35b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index cb2dc9d..94893fb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -33,6 +33,7 @@ import kafka.message.MessageAndMetadata; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public class KafkaConsumer extends DefaultConsumer { protected void doStart() throws Exception { super.doStart(); log.info("Starting Kafka consumer"); + executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConsumersCount(); i++) { ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps())); @@ -79,8 +81,10 @@ public class KafkaConsumer extends DefaultConsumer { topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams()); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic()); + + // commit periodically if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) { - if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs().intValue() < 0) + if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0) && endpoint.getConsumerStreams() > 1) { LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams."); } @@ -90,8 +94,9 @@ public class KafkaConsumer extends DefaultConsumer { } consumerBarriers.put(consumer, barrier); } else { + // auto commit for (final KafkaStream<byte[], byte[]> stream : streams) { - executor.submit(new AutoCommitConsumerTask(stream)); + executor.submit(new AutoCommitConsumerTask(consumer, stream)); } consumerBarriers.put(consumer, null); } @@ -103,11 +108,14 @@ public class KafkaConsumer extends DefaultConsumer { protected void doStop() throws Exception { super.doStop(); log.info("Stopping Kafka consumer"); + for (ConsumerConnector consumer : consumerBarriers.keySet()) { if (consumer != null) { consumer.shutdown(); } } + consumerBarriers.clear(); + if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); @@ -175,7 +183,7 @@ public class KafkaConsumer extends DefaultConsumer { class CommitOffsetTask implements Runnable { - private ConsumerConnector consumer; + private final ConsumerConnector consumer; public CommitOffsetTask(ConsumerConnector consumer) { this.consumer = consumer; @@ -183,22 +191,25 @@ public class KafkaConsumer extends DefaultConsumer { @Override public void run() { + LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer)); consumer.commitOffsets(); } } class AutoCommitConsumerTask implements Runnable { + private final ConsumerConnector consumer; private KafkaStream<byte[], byte[]> stream; - public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) { + public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) { + this.consumer = consumer; this.stream = stream; } public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); // only poll the next message if we are allowed to run and are not suspending - while (isRunAllowed() && it.hasNext()) { + while (isRunAllowed() && !isSuspendingOrSuspended() && it.hasNext()) { MessageAndMetadata<byte[], byte[]> mm = it.next(); Exchange exchange = endpoint.createKafkaExchange(mm); try { @@ -207,6 +218,9 @@ public class KafkaConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error during processing", exchange, e); } } + // no more data so commit offset + LOG.debug("Commit offsets on consumer: {}", ObjectHelper.getIdentityHashCode(consumer)); + consumer.commitOffsets(); } } }