This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23812e3b710ff249775a091c9615287ea4d695b4
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Tue Oct 11 13:47:06 2022 +0200

    CAMEL-18148: allow updating using the key and value separately
---
 .../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 11 ++++++++---
 .../src/main/java/org/apache/camel/resume/ResumeStrategy.java |  9 +++++++++
 .../camel/processor/resume/TransientResumeStrategy.java       |  7 +++++++
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 8689eeed96e..995d4103740 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -132,8 +132,13 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
             LOG.debug("Updating offset on Kafka with key {} to {}", 
key.getValue(), offsetValue.getValue());
         }
 
-        ByteBuffer keyBuffer = key.serialize();
-        ByteBuffer valueBuffer = offsetValue.serialize();
+        updateLastOffset(key, offsetValue);
+    }
+
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) 
throws Exception {
+        ByteBuffer keyBuffer = offsetKey.serialize();
+        ByteBuffer valueBuffer = offsetKey.serialize();
 
         try {
             lock.lock();
@@ -142,7 +147,7 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
             lock.unlock();
         }
 
-        doAdd(key, offsetValue);
+        doAdd(offsetKey, offset);
     }
 
     /**
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index a9325b829b4..145fdc7145d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -65,4 +65,13 @@ public interface ResumeStrategy extends Service {
      * @throws Exception if unable to update the offset
      */
     <T extends Resumable> void updateLastOffset(T offset) throws Exception;
+
+    /**
+     * Updates the last processed offset
+     *
+     * @param  offset    the offset key to update
+     * @param  offset    the offset value to update
+     * @throws Exception if unable to update the offset
+     */
+    void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws 
Exception;
 }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 0e926152536..55147474ea2 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.processor.resume;
 
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeStrategy;
@@ -52,6 +54,11 @@ public class TransientResumeStrategy implements 
ResumeStrategy {
         // this is NO-OP
     }
 
+    @Override
+    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
+        // this is NO-OP
+    }
+
     @Override
     public void start() {
         // this is NO-OP

Reply via email to