This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 398888d47cd0db2179c7e55991d2181671650cf7 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Sat May 21 09:58:10 2022 +0200 CAMEL-18135: fix incorrect sync commit of async manual commit --- .../camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java | 2 +- .../component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java index fbc4d0610a6..e38761c4ef7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java @@ -29,6 +29,6 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl @Override public void commit() { - commitManager.forceCommit(getPartition(), getRecordOffset()); + commitManager.recordOffset(getPartition(), getRecordOffset()); } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java index 222d326692e..f1e007482d5 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java @@ -41,10 +41,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class); - public static final String TOPIC = "testManualCommitTest"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class); + @EndpointInject("kafka:" + TOPIC + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false" + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory")