This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 23812e3b710ff249775a091c9615287ea4d695b4 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Oct 11 13:47:06 2022 +0200 CAMEL-18148: allow updating using the key and value separately --- .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 11 ++++++++--- .../src/main/java/org/apache/camel/resume/ResumeStrategy.java | 9 +++++++++ .../camel/processor/resume/TransientResumeStrategy.java | 7 +++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 8689eeed96e..995d4103740 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -132,8 +132,13 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue()); } - ByteBuffer keyBuffer = key.serialize(); - ByteBuffer valueBuffer = offsetValue.serialize(); + updateLastOffset(key, offsetValue); + } + + @Override + public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception { + ByteBuffer keyBuffer = offsetKey.serialize(); + ByteBuffer valueBuffer = offsetKey.serialize(); try { lock.lock(); @@ -142,7 +147,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka lock.unlock(); } - doAdd(key, offsetValue); + doAdd(offsetKey, offset); } /** diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java index a9325b829b4..145fdc7145d 100644 --- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java @@ -65,4 +65,13 @@ public interface ResumeStrategy extends Service { * @throws Exception if unable to update the offset */ <T extends Resumable> void updateLastOffset(T offset) throws Exception; + + /** + * Updates the last processed offset + * + * @param offset the offset key to update + * @param offset the offset value to update + * @throws Exception if unable to update the offset + */ + void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception; } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java index 0e926152536..55147474ea2 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java @@ -17,6 +17,8 @@ package org.apache.camel.processor.resume; +import org.apache.camel.resume.Offset; +import org.apache.camel.resume.OffsetKey; import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.ResumeStrategy; @@ -52,6 +54,11 @@ public class TransientResumeStrategy implements ResumeStrategy { // this is NO-OP } + @Override + public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) { + // this is NO-OP + } + @Override public void start() { // this is NO-OP