This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 25539af CAMEL-13339 - Fix to use last processed offset maintained by Camel KafkaConsumer to avoid message loss upon partition revoke 25539af is described below commit 25539afcfdf1966eaafc83b28657e6f410a5bc40 Author: Viswa Ramamoorthy <viramamoor...@manh.com> AuthorDate: Fri Mar 22 07:37:04 2019 -0400 CAMEL-13339 - Fix to use last processed offset maintained by Camel KafkaConsumer to avoid message loss upon partition revoke --- .../camel/component/kafka/KafkaConsumer.java | 17 ++- .../KafkaConsumerRebalancePartitionRevokeTest.java | 123 +++++++++++++++++++++ 2 files changed, 136 insertions(+), 4 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index ebb987c..9417dc7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -22,9 +22,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; import java.util.stream.StreamSupport; @@ -171,6 +173,7 @@ public class KafkaConsumer extends DefaultConsumer { private final Pattern topicPattern; private final String threadId; private final Properties kafkaProps; + private final Map<String, Long> lastProcessedOffset = new ConcurrentHashMap<>(); KafkaFetchRecords(String topicName, Pattern topicPattern, String id, Properties kafkaProps) { this.topicName = topicName; @@ -294,6 +297,7 @@ public class KafkaConsumer extends DefaultConsumer { long partitionLastOffset = -1; Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator(); + log.debug("Records count {} received for partition {}", allRecords.records(partition).size(), partition); if (!breakOnErrorHit && recordIterator.hasNext()) { ConsumerRecord<Object, Object> record; @@ -341,6 +345,8 @@ public class KafkaConsumer extends DefaultConsumer { } else { // record was success so remember its offset partitionLastOffset = record.offset(); + //lastOffsetProcessed would be used by Consumer re-balance listener to preserve offset state upon partition revoke + lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset); } } @@ -418,12 +424,15 @@ public class KafkaConsumer extends DefaultConsumer { log.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName); StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository(); - if (offsetRepository != null) { for (TopicPartition partition : partitions) { - long offset = consumer.position(partition); - log.debug("Saving offset repository state {} from topic {} with offset: {}", threadId, topicName, offset); - offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(offset)); + String offsetKey = serializeOffsetKey(partition); + Long offset = lastProcessedOffset.get(offsetKey); + if (offset == null) { + offset = -1l; } + log.debug("Saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, offset); + commitOffset(offsetRepository, partition, offset, true); + lastProcessedOffset.remove(offsetKey); } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java new file mode 100644 index 0000000..756a917 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.impl.MemoryStateRepository; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.After; +import org.junit.Test; + +public class KafkaConsumerRebalancePartitionRevokeTest extends BaseEmbeddedKafkaTest { + private static final String TOPIC = "offset-rebalance"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + private OffsetStateRepository stateRepository; + private CountDownLatch messagesLatch; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @Override + protected void doPreSetup() throws Exception { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + + kafkaBroker.createTopic(TOPIC, 2); + for (int i = 0; i < 2; i++) { + producer.send(new ProducerRecord<>(TOPIC, i % 2, "key", "message-" + i)); + } + messagesLatch = new CountDownLatch(1); + stateRepository = new OffsetStateRepository(messagesLatch); + } + + @After + public void after() { + if (producer != null) { + producer.close(); + } + } + + @Test + public void ensurePartitionRevokeCallsWithLastProcessedOffset() throws Exception { + boolean partitionRevokeCalled = messagesLatch.await(30000, TimeUnit.MILLISECONDS); + assertTrue("StateRepository.setState should have been called with offset >= 0 for topic" + TOPIC + + ". Remaining count : " + messagesLatch.getCount(), partitionRevokeCalled); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("offset", stateRepository); + return registry; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("kafka:" + TOPIC + + "?groupId=" + TOPIC + "_GROUP" + + "&autoCommitIntervalMs=1000" + + "&autoOffsetReset=earliest" + + "&consumersCount=1" + + "&offsetRepository=#offset") + .routeId("consumer-rebalance-route") + .to("mock:result"); + } + }; + } + + public class OffsetStateRepository extends MemoryStateRepository { + CountDownLatch messagesLatch = null; + + public OffsetStateRepository(CountDownLatch messagesLatch) { + this.messagesLatch = messagesLatch; + } + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + + @Override + public String getState(String key) { + return super.getState(key); + } + + @Override + public void setState(String key, String value) { + if (key.contains(TOPIC) && messagesLatch.getCount() > 0 + && Long.parseLong(value) >= 0) { + messagesLatch.countDown(); + } + super.setState(key, value); + } + } +}