This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 3635a7c7cd4 kafka manual commit classloading issue (#17372) 3635a7c7cd4 is described below commit 3635a7c7cd413533e490149a40cd967d06478da0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Mar 6 12:13:52 2025 +0000 kafka manual commit classloading issue (#17372) CAMEL-21833: Move kamelet kafka manual commit to camel-kafka due to classloading problems. Remove dependency on camel-kafka in camel-kameleets. --- .../camel/component}/kafka/BatchManualCommit.java | 30 ++++++++++++++-------- .../camel/component}/kafka/ManualCommit.java | 10 ++++++-- .../consumer/DefaultKafkaManualAsyncCommit.java | 1 - .../kafka/consumer/DefaultKafkaManualCommit.java | 10 +++++--- .../batching/KafkaRecordBatchingProcessor.java | 5 ++++ components/camel-kamelet/pom.xml | 5 ---- .../kafka/KafkaHeaderDeserializer.java | 6 +---- .../utils/transform/MessageTimestampRouter.java | 5 ++-- .../kamelet/utils/transform/RegexRouter.java | 5 ++-- .../kamelet/utils/transform/TimestampRouter.java | 5 ++-- .../kamelet/utils/transform/kafka/ValueToKey.java | 3 +-- .../kamelet/utils/transform/RegexRouterTest.java | 5 ++-- 12 files changed, 49 insertions(+), 41 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java similarity index 55% rename from components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java index 8b97d3b9ff1..3bf8c2495f2 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java @@ -14,29 +14,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kamelet.utils.transform.kafka; +package org.apache.camel.component.kafka; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BatchManualCommit implements Processor { + private static final Logger LOG = LoggerFactory.getLogger(BatchManualCommit.class); + @Override public void process(Exchange exchange) throws Exception { - List<?> exchanges = exchange.getMessage().getBody(List.class); - if (exchanges.size() > 0) { - final Object tmp = exchanges.get(exchanges.size() - 1); - if (tmp instanceof Exchange element) { - KafkaManualCommit manual - = element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); - if (manual != null) { - manual.commit(); + KafkaManualCommit manual + = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (manual == null) { + List<?> exchanges = exchange.getMessage().getBody(List.class); + if (exchanges != null && !exchanges.isEmpty()) { + Object obj = exchanges.get(exchanges.size() - 1); + if (obj instanceof Exchange last) { + manual = last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); } } } + + if (manual != null) { + LOG.debug("Performing Kafka Batch manual commit: {}", manual); + manual.commit(); + } else { + LOG.debug("Cannot perform Kafka Batch manual commit due header: {} is missing", KafkaConstants.MANUAL_COMMIT); + } } } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java similarity index 76% rename from components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java index db92916f2b9..8fa06ab73aa 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java @@ -14,20 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kamelet.utils.transform.kafka; +package org.apache.camel.component.kafka; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ManualCommit implements Processor { + private static final Logger LOG = LoggerFactory.getLogger(BatchManualCommit.class); + @Override public void process(Exchange exchange) throws Exception { KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); if (manual != null) { + LOG.debug("Performing Kafka manual commit: {}", manual); manual.commit(); + } else { + LOG.debug("Cannot perform Kafka manual commit due header: {} is missing", KafkaConstants.MANUAL_COMMIT); } } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java index e38761c4ef7..7eea8cdc5f4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java @@ -23,7 +23,6 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl KafkaManualCommitFactory.KafkaRecordPayload recordPayload, CommitManager commitManager) { super(camelExchangePayload, recordPayload); - this.commitManager = commitManager; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java index 5aa71a8ccfc..617526cae0a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java @@ -66,8 +66,6 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { /** * Gets the Camel Exchange payload - * - * @return */ public KafkaManualCommitFactory.CamelExchangePayload getCamelExchangePayload() { return camelExchangePayload; @@ -75,10 +73,14 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { /** * Gets the Kafka record payload - * - * @return */ public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload() { return kafkaRecordPayload; } + + @Override + public String toString() { + return "KafkaManualCommit[topic=" + getTopicName() + ", offset=" + getRecordOffset() + "]"; + } + } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 09247cddc39..44193f89c71 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -159,8 +159,13 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { Message message = exchange.getMessage(); var exchanges = exchangeList.stream().toList(); message.setBody(exchanges); + try { if (configuration.isAllowManualCommit()) { + Exchange last = exchanges.isEmpty() ? null : exchanges.get(exchanges.size() - 1); + if (last != null) { + message.setHeader(KafkaConstants.MANUAL_COMMIT, last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT)); + } manualCommitResultProcessing(camelKafkaConsumer, exchange); } else { autoCommitResultProcessing(camelKafkaConsumer, exchange, exchanges.size()); diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml index b6f9a7149b5..018a567d0f1 100644 --- a/components/camel-kamelet/pom.xml +++ b/components/camel-kamelet/pom.xml @@ -71,11 +71,6 @@ <version>1.2</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-kafka</artifactId> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jackson</artifactId> diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java index 6cb8f2f1b02..7f5eae8041a 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.support.SimpleTypeConverter; /** @@ -84,12 +83,9 @@ public class KafkaHeaderDeserializer implements Processor { /** * Exclude special Kafka headers from auto deserialization. - * - * @param entry - * @return */ private boolean shouldDeserialize(Map.Entry<String, Object> entry) { - return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT); + return !entry.getKey().equals("kafka.HEADERS") && !entry.getKey().equals("CamelKafkaManualCommit"); } public void setEnabled(String enabled) { diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java index 3b1ef2eb3dc..048a74ec032 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.Exchange; import org.apache.camel.ExchangeProperty; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.util.ObjectHelper; public class MessageTimestampRouter { @@ -55,7 +54,7 @@ public class MessageTimestampRouter { } Object rawTimestamp = null; - String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class); for (String key : splittedKeys) { if (ObjectHelper.isNotEmpty(key)) { rawTimestamp = body.get(key); @@ -83,7 +82,7 @@ public class MessageTimestampRouter { replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement("")); updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp)); } - ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic); + ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic); } } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java index 517f1d0b787..4bbc27a56ab 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java @@ -21,7 +21,6 @@ import java.util.regex.Pattern; import org.apache.camel.Exchange; import org.apache.camel.ExchangeProperty; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.util.ObjectHelper; public class RegexRouter { @@ -29,12 +28,12 @@ public class RegexRouter { public void process( @ExchangeProperty("regex") String regex, @ExchangeProperty("replacement") String replacement, Exchange ex) { Pattern regexPattern = Pattern.compile(regex); - String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class); if (ObjectHelper.isNotEmpty(topicName)) { final Matcher matcher = regexPattern.matcher(topicName); if (matcher.matches()) { final String topicUpdated = matcher.replaceFirst(replacement); - ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, topicUpdated); + ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", topicUpdated); } } String ceType = ex.getMessage().getHeader("ce-type", String.class); diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java index 9c660497f0e..db41dcbf013 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java @@ -25,7 +25,6 @@ import java.util.regex.Pattern; import org.apache.camel.Exchange; import org.apache.camel.ExchangeProperty; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.util.ObjectHelper; public class TimestampRouter { @@ -41,7 +40,7 @@ public class TimestampRouter { fmt.setTimeZone(TimeZone.getTimeZone("UTC")); Long timestamp = null; - String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + String topicName = ex.getMessage().getHeader("kafka.TOPIC", String.class); Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName); if (rawTimestamp instanceof Long) { timestamp = (Long) rawTimestamp; @@ -62,7 +61,7 @@ public class TimestampRouter { replace1 = TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement("")); updatedTopic = TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp)); } - ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, updatedTopic); + ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic); } } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java index 8333ab25cf6..208ab32b0b9 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.Exchange; import org.apache.camel.ExchangeProperty; import org.apache.camel.InvalidPayloadException; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.util.ObjectHelper; public class ValueToKey { @@ -48,7 +47,7 @@ public class ValueToKey { } } - ex.getMessage().setHeader(KafkaConstants.KEY, key); + ex.getMessage().setHeader("kafka.KEY", key); } boolean filterNames(String fieldName, List<String> splittedFields) { diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java index c1837a86718..d782a05fb89 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.kamelet.utils.transform; import org.apache.camel.Exchange; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.Assertions; @@ -42,10 +41,10 @@ class RegexRouterTest { void shouldReplaceFieldToPlainJson() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic); + exchange.getMessage().setHeader("kafka.TOPIC", topic); processor.process(".*ll.*", "newTopic", exchange); - Assertions.assertEquals("newTopic", exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC)); + Assertions.assertEquals("newTopic", exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC")); } }