This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 25539af  CAMEL-13339 - Fix to use last processed offset maintained by 
Camel KafkaConsumer to avoid message loss upon partition revoke
25539af is described below

commit 25539afcfdf1966eaafc83b28657e6f410a5bc40
Author: Viswa Ramamoorthy <viramamoor...@manh.com>
AuthorDate: Fri Mar 22 07:37:04 2019 -0400

    CAMEL-13339 - Fix to use last processed offset maintained by Camel 
KafkaConsumer to avoid message loss upon partition revoke
---
 .../camel/component/kafka/KafkaConsumer.java       |  17 ++-
 .../KafkaConsumerRebalancePartitionRevokeTest.java | 123 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ebb987c..9417dc7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -22,9 +22,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
 import java.util.stream.StreamSupport;
@@ -171,6 +173,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final Pattern topicPattern;
         private final String threadId;
         private final Properties kafkaProps;
+        private final Map<String, Long> lastProcessedOffset = new 
ConcurrentHashMap<>();
 
         KafkaFetchRecords(String topicName, Pattern topicPattern, String id, 
Properties kafkaProps) {
             this.topicName = topicName;
@@ -294,6 +297,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         long partitionLastOffset = -1;
 
                         Iterator<ConsumerRecord<Object, Object>> 
recordIterator = allRecords.records(partition).iterator();
+                        log.debug("Records count {} received for partition 
{}", allRecords.records(partition).size(), partition);
                         if (!breakOnErrorHit && recordIterator.hasNext()) {
                             ConsumerRecord<Object, Object> record;
 
@@ -341,6 +345,8 @@ public class KafkaConsumer extends DefaultConsumer {
                                 } else {
                                     // record was success so remember its 
offset
                                     partitionLastOffset = record.offset();
+                                    //lastOffsetProcessed would be used by 
Consumer re-balance listener to preserve offset state upon partition revoke
+                                    
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
                                 }
                             }
 
@@ -418,12 +424,15 @@ public class KafkaConsumer extends DefaultConsumer {
             log.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
 
             StateRepository<String, String> offsetRepository = 
endpoint.getConfiguration().getOffsetRepository();
-            if (offsetRepository != null) {
                 for (TopicPartition partition : partitions) {
-                    long offset = consumer.position(partition);
-                    log.debug("Saving offset repository state {} from topic {} 
with offset: {}", threadId, topicName, offset);
-                    offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(offset));
+                String offsetKey = serializeOffsetKey(partition);
+                Long offset = lastProcessedOffset.get(offsetKey);
+                if (offset == null) {
+                    offset = -1l;
                 }
+                log.debug("Saving offset repository state {} from offsetKey {} 
with offset: {}", threadId, offsetKey, offset);
+                commitOffset(offsetRepository, partition, offset, true);
+                lastProcessedOffset.remove(offsetKey);
             }
         }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
new file mode 100644
index 0000000..756a917
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.MemoryStateRepository;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Test;
+
+public class KafkaConsumerRebalancePartitionRevokeTest extends 
BaseEmbeddedKafkaTest {
+    private static final String TOPIC = "offset-rebalance";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    private OffsetStateRepository stateRepository;
+    private CountDownLatch messagesLatch;
+    
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+
+        kafkaBroker.createTopic(TOPIC, 2);
+        for (int i = 0; i < 2; i++) {
+            producer.send(new ProducerRecord<>(TOPIC, i % 2, "key", "message-" 
+ i));
+        }
+        messagesLatch = new CountDownLatch(1);
+        stateRepository = new OffsetStateRepository(messagesLatch);
+    }
+
+    @After
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+    }
+
+    @Test
+    public void ensurePartitionRevokeCallsWithLastProcessedOffset() throws 
Exception {
+        boolean partitionRevokeCalled = messagesLatch.await(30000, 
TimeUnit.MILLISECONDS);
+        assertTrue("StateRepository.setState should have been called with 
offset >= 0 for topic" + TOPIC + 
+                ". Remaining count : " + messagesLatch.getCount(), 
partitionRevokeCalled);
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("offset", stateRepository);
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("kafka:" + TOPIC
+                             + "?groupId=" + TOPIC + "_GROUP"
+                             + "&autoCommitIntervalMs=1000"
+                             + "&autoOffsetReset=earliest"
+                             + "&consumersCount=1"
+                             + "&offsetRepository=#offset")
+                        .routeId("consumer-rebalance-route")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public class OffsetStateRepository extends MemoryStateRepository {
+        CountDownLatch messagesLatch = null;
+        
+        public OffsetStateRepository(CountDownLatch messagesLatch) {
+            this.messagesLatch = messagesLatch;
+        }
+
+        @Override
+        public void start() throws Exception {
+        }
+
+        @Override
+        public void stop() throws Exception {
+        }
+
+        @Override
+        public String getState(String key) {
+            return super.getState(key);
+        }
+
+        @Override
+        public void setState(String key, String value) {
+            if (key.contains(TOPIC) && messagesLatch.getCount() > 0
+                       && Long.parseLong(value) >= 0) {
+                messagesLatch.countDown();
+            }
+            super.setState(key, value);
+        }
+    }
+}

Reply via email to