This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3635a7c7cd4 kafka manual commit classloading issue (#17372)
3635a7c7cd4 is described below

commit 3635a7c7cd413533e490149a40cd967d06478da0
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Mar 6 12:13:52 2025 +0000

    kafka manual commit classloading issue (#17372)
    
    CAMEL-21833: Move kamelet kafka manual commit to camel-kafka due to 
classloading problems. Remove dependency on camel-kafka in camel-kameleets.
---
 .../camel/component}/kafka/BatchManualCommit.java  | 30 ++++++++++++++--------
 .../camel/component}/kafka/ManualCommit.java       | 10 ++++++--
 .../consumer/DefaultKafkaManualAsyncCommit.java    |  1 -
 .../kafka/consumer/DefaultKafkaManualCommit.java   | 10 +++++---
 .../batching/KafkaRecordBatchingProcessor.java     |  5 ++++
 components/camel-kamelet/pom.xml                   |  5 ----
 .../kafka/KafkaHeaderDeserializer.java             |  6 +----
 .../utils/transform/MessageTimestampRouter.java    |  5 ++--
 .../kamelet/utils/transform/RegexRouter.java       |  5 ++--
 .../kamelet/utils/transform/TimestampRouter.java   |  5 ++--
 .../kamelet/utils/transform/kafka/ValueToKey.java  |  3 +--
 .../kamelet/utils/transform/RegexRouterTest.java   |  5 ++--
 12 files changed, 49 insertions(+), 41 deletions(-)

diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
similarity index 55%
rename from 
components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
rename to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
index 8b97d3b9ff1..3bf8c2495f2 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
@@ -14,29 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kamelet.utils.transform.kafka;
+package org.apache.camel.component.kafka;
 
 import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BatchManualCommit implements Processor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchManualCommit.class);
+
     @Override
     public void process(Exchange exchange) throws Exception {
-        List<?> exchanges = exchange.getMessage().getBody(List.class);
-        if (exchanges.size() > 0) {
-            final Object tmp = exchanges.get(exchanges.size() - 1);
-            if (tmp instanceof Exchange element) {
-                KafkaManualCommit manual
-                        = 
element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
-                if (manual != null) {
-                    manual.commit();
+        KafkaManualCommit manual
+                = 
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        if (manual == null) {
+            List<?> exchanges = exchange.getMessage().getBody(List.class);
+            if (exchanges != null && !exchanges.isEmpty()) {
+                Object obj = exchanges.get(exchanges.size() - 1);
+                if (obj instanceof Exchange last) {
+                    manual = 
last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
                 }
             }
         }
+
+        if (manual != null) {
+            LOG.debug("Performing Kafka Batch manual commit: {}", manual);
+            manual.commit();
+        } else {
+            LOG.debug("Cannot perform Kafka Batch manual commit due header: {} 
is missing", KafkaConstants.MANUAL_COMMIT);
+        }
     }
 }
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
similarity index 76%
rename from 
components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
rename to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
index db92916f2b9..8fa06ab73aa 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
@@ -14,20 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kamelet.utils.transform.kafka;
+package org.apache.camel.component.kafka;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ManualCommit implements Processor {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchManualCommit.class);
+
     @Override
     public void process(Exchange exchange) throws Exception {
         KafkaManualCommit manual = 
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
         if (manual != null) {
+            LOG.debug("Performing Kafka manual commit: {}", manual);
             manual.commit();
+        } else {
+            LOG.debug("Cannot perform Kafka manual commit due header: {} is 
missing", KafkaConstants.MANUAL_COMMIT);
         }
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index e38761c4ef7..7eea8cdc5f4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -23,7 +23,6 @@ public class DefaultKafkaManualAsyncCommit extends 
DefaultKafkaManualCommit impl
                                          
KafkaManualCommitFactory.KafkaRecordPayload recordPayload,
                                          CommitManager commitManager) {
         super(camelExchangePayload, recordPayload);
-
         this.commitManager = commitManager;
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
index 5aa71a8ccfc..617526cae0a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
@@ -66,8 +66,6 @@ public abstract class DefaultKafkaManualCommit implements 
KafkaManualCommit {
 
     /**
      * Gets the Camel Exchange payload
-     *
-     * @return
      */
     public KafkaManualCommitFactory.CamelExchangePayload 
getCamelExchangePayload() {
         return camelExchangePayload;
@@ -75,10 +73,14 @@ public abstract class DefaultKafkaManualCommit implements 
KafkaManualCommit {
 
     /**
      * Gets the Kafka record payload
-     *
-     * @return
      */
     public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload() 
{
         return kafkaRecordPayload;
     }
+
+    @Override
+    public String toString() {
+        return "KafkaManualCommit[topic=" + getTopicName() + ", offset=" + 
getRecordOffset() + "]";
+    }
+
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 09247cddc39..44193f89c71 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -159,8 +159,13 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         Message message = exchange.getMessage();
         var exchanges = exchangeList.stream().toList();
         message.setBody(exchanges);
+
         try {
             if (configuration.isAllowManualCommit()) {
+                Exchange last = exchanges.isEmpty() ? null : 
exchanges.get(exchanges.size() - 1);
+                if (last != null) {
+                    message.setHeader(KafkaConstants.MANUAL_COMMIT, 
last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT));
+                }
                 manualCommitResultProcessing(camelKafkaConsumer, exchange);
             } else {
                 autoCommitResultProcessing(camelKafkaConsumer, exchange, 
exchanges.size());
diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index b6f9a7149b5..018a567d0f1 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -71,11 +71,6 @@
             <version>1.2</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-kafka</artifactId>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-jackson</artifactId>
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
index 6cb8f2f1b02..7f5eae8041a 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.TypeConverter;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.support.SimpleTypeConverter;
 
 /**
@@ -84,12 +83,9 @@ public class KafkaHeaderDeserializer implements Processor {
 
     /**
      * Exclude special Kafka headers from auto deserialization.
-     *
-     * @param  entry
-     * @return
      */
     private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
-        return !entry.getKey().equals(KafkaConstants.HEADERS) && 
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
+        return !entry.getKey().equals("kafka.HEADERS") && 
!entry.getKey().equals("CamelKafkaManualCommit");
     }
 
     public void setEnabled(String enabled) {
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
index 3b1ef2eb3dc..048a74ec032 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class MessageTimestampRouter {
@@ -55,7 +54,7 @@ public class MessageTimestampRouter {
         }
 
         Object rawTimestamp = null;
-        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
+        String topicName = ex.getMessage().getHeader("kafka.TOPIC", 
String.class);
         for (String key : splittedKeys) {
             if (ObjectHelper.isNotEmpty(key)) {
                 rawTimestamp = body.get(key);
@@ -83,7 +82,7 @@ public class MessageTimestampRouter {
                 replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
                 updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
             }
-            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
+            ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
         }
     }
 
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
index 517f1d0b787..4bbc27a56ab 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
@@ -21,7 +21,6 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class RegexRouter {
@@ -29,12 +28,12 @@ public class RegexRouter {
     public void process(
             @ExchangeProperty("regex") String regex, 
@ExchangeProperty("replacement") String replacement, Exchange ex) {
         Pattern regexPattern = Pattern.compile(regex);
-        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
+        String topicName = ex.getMessage().getHeader("kafka.TOPIC", 
String.class);
         if (ObjectHelper.isNotEmpty(topicName)) {
             final Matcher matcher = regexPattern.matcher(topicName);
             if (matcher.matches()) {
                 final String topicUpdated = matcher.replaceFirst(replacement);
-                ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
topicUpdated);
+                ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", 
topicUpdated);
             }
         }
         String ceType = ex.getMessage().getHeader("ce-type", String.class);
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
index 9c660497f0e..db41dcbf013 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
@@ -25,7 +25,6 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class TimestampRouter {
@@ -41,7 +40,7 @@ public class TimestampRouter {
         fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
 
         Long timestamp = null;
-        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
+        String topicName = ex.getMessage().getHeader("kafka.TOPIC", 
String.class);
         Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
         if (rawTimestamp instanceof Long) {
             timestamp = (Long) rawTimestamp;
@@ -62,7 +61,7 @@ public class TimestampRouter {
                 replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
                 updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
             }
-            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
+            ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
         }
     }
 
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
index 8333ab25cf6..208ab32b0b9 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
 import org.apache.camel.InvalidPayloadException;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 public class ValueToKey {
@@ -48,7 +47,7 @@ public class ValueToKey {
             }
         }
 
-        ex.getMessage().setHeader(KafkaConstants.KEY, key);
+        ex.getMessage().setHeader("kafka.KEY", key);
     }
 
     boolean filterNames(String fieldName, List<String> splittedFields) {
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
index c1837a86718..d782a05fb89 100644
--- 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.kamelet.utils.transform;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
@@ -42,10 +41,10 @@ class RegexRouterTest {
     void shouldReplaceFieldToPlainJson() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic);
+        exchange.getMessage().setHeader("kafka.TOPIC", topic);
 
         processor.process(".*ll.*", "newTopic", exchange);
 
-        Assertions.assertEquals("newTopic", 
exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC));
+        Assertions.assertEquals("newTopic", 
exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC"));
     }
 }

Reply via email to