This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch km
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f76274400ca0fedae499b53c16e0454886864156
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Mar 6 12:02:24 2025 +0100

    CAMEL-21833: Move kamelet kafka manual commit to camel-kafka due to 
classloading problems.
---
 .../camel/component}/kafka/BatchManualCommit.java  | 30 ++++++++++++++--------
 .../apache/camel/component/kafka/ManualCommit.java | 23 +++++++++++++++++
 .../consumer/DefaultKafkaManualAsyncCommit.java    |  1 -
 .../kafka/consumer/DefaultKafkaManualCommit.java   | 10 +++++---
 .../batching/KafkaRecordBatchingProcessor.java     |  5 ++++
 .../utils/transform/kafka/BatchManualCommit.java   |  1 +
 .../utils/transform/kafka/ManualCommit.java        |  1 +
 7 files changed, 56 insertions(+), 15 deletions(-)

diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
similarity index 55%
copy from 
components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
copy to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
index 8b97d3b9ff1..3bf8c2495f2 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
@@ -14,29 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kamelet.utils.transform.kafka;
+package org.apache.camel.component.kafka;
 
 import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BatchManualCommit implements Processor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchManualCommit.class);
+
     @Override
     public void process(Exchange exchange) throws Exception {
-        List<?> exchanges = exchange.getMessage().getBody(List.class);
-        if (exchanges.size() > 0) {
-            final Object tmp = exchanges.get(exchanges.size() - 1);
-            if (tmp instanceof Exchange element) {
-                KafkaManualCommit manual
-                        = 
element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
-                if (manual != null) {
-                    manual.commit();
+        KafkaManualCommit manual
+                = 
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        if (manual == null) {
+            List<?> exchanges = exchange.getMessage().getBody(List.class);
+            if (exchanges != null && !exchanges.isEmpty()) {
+                Object obj = exchanges.get(exchanges.size() - 1);
+                if (obj instanceof Exchange last) {
+                    manual = 
last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
                 }
             }
         }
+
+        if (manual != null) {
+            LOG.debug("Performing Kafka Batch manual commit: {}", manual);
+            manual.commit();
+        } else {
+            LOG.debug("Cannot perform Kafka Batch manual commit due header: {} 
is missing", KafkaConstants.MANUAL_COMMIT);
+        }
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
new file mode 100644
index 00000000000..7d1d469b94c
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
@@ -0,0 +1,23 @@
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManualCommit implements Processor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchManualCommit.class);
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        KafkaManualCommit manual = 
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        if (manual != null) {
+            LOG.debug("Performing Kafka manual commit: {}", manual);
+            manual.commit();
+        } else {
+            LOG.debug("Cannot perform Kafka manual commit due header: {} is 
missing", KafkaConstants.MANUAL_COMMIT);
+        }
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index e38761c4ef7..7eea8cdc5f4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -23,7 +23,6 @@ public class DefaultKafkaManualAsyncCommit extends 
DefaultKafkaManualCommit impl
                                          
KafkaManualCommitFactory.KafkaRecordPayload recordPayload,
                                          CommitManager commitManager) {
         super(camelExchangePayload, recordPayload);
-
         this.commitManager = commitManager;
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
index 5aa71a8ccfc..617526cae0a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
@@ -66,8 +66,6 @@ public abstract class DefaultKafkaManualCommit implements 
KafkaManualCommit {
 
     /**
      * Gets the Camel Exchange payload
-     *
-     * @return
      */
     public KafkaManualCommitFactory.CamelExchangePayload 
getCamelExchangePayload() {
         return camelExchangePayload;
@@ -75,10 +73,14 @@ public abstract class DefaultKafkaManualCommit implements 
KafkaManualCommit {
 
     /**
      * Gets the Kafka record payload
-     *
-     * @return
      */
     public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload() 
{
         return kafkaRecordPayload;
     }
+
+    @Override
+    public String toString() {
+        return "KafkaManualCommit[topic=" + getTopicName() + ", offset=" + 
getRecordOffset() + "]";
+    }
+
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 09247cddc39..44193f89c71 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -159,8 +159,13 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         Message message = exchange.getMessage();
         var exchanges = exchangeList.stream().toList();
         message.setBody(exchanges);
+
         try {
             if (configuration.isAllowManualCommit()) {
+                Exchange last = exchanges.isEmpty() ? null : 
exchanges.get(exchanges.size() - 1);
+                if (last != null) {
+                    message.setHeader(KafkaConstants.MANUAL_COMMIT, 
last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT));
+                }
                 manualCommitResultProcessing(camelKafkaConsumer, exchange);
             } else {
                 autoCommitResultProcessing(camelKafkaConsumer, exchange, 
exchanges.size());
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
index 8b97d3b9ff1..59258773a0f 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
@@ -23,6 +23,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 
+@Deprecated
 public class BatchManualCommit implements Processor {
 
     @Override
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
index db92916f2b9..97bb8040a6d 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
@@ -21,6 +21,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 
+@Deprecated
 public class ManualCommit implements Processor {
 
     @Override

Reply via email to