This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch km in repository https://gitbox.apache.org/repos/asf/camel.git
commit f76274400ca0fedae499b53c16e0454886864156 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Mar 6 12:02:24 2025 +0100 CAMEL-21833: Move kamelet kafka manual commit to camel-kafka due to classloading problems. --- .../camel/component}/kafka/BatchManualCommit.java | 30 ++++++++++++++-------- .../apache/camel/component/kafka/ManualCommit.java | 23 +++++++++++++++++ .../consumer/DefaultKafkaManualAsyncCommit.java | 1 - .../kafka/consumer/DefaultKafkaManualCommit.java | 10 +++++--- .../batching/KafkaRecordBatchingProcessor.java | 5 ++++ .../utils/transform/kafka/BatchManualCommit.java | 1 + .../utils/transform/kafka/ManualCommit.java | 1 + 7 files changed, 56 insertions(+), 15 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java similarity index 55% copy from components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java index 8b97d3b9ff1..3bf8c2495f2 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java @@ -14,29 +14,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kamelet.utils.transform.kafka; +package org.apache.camel.component.kafka; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BatchManualCommit implements Processor { + private static final Logger LOG = LoggerFactory.getLogger(BatchManualCommit.class); + @Override public void process(Exchange exchange) throws Exception { - List<?> exchanges = exchange.getMessage().getBody(List.class); - if (exchanges.size() > 0) { - final Object tmp = exchanges.get(exchanges.size() - 1); - if (tmp instanceof Exchange element) { - KafkaManualCommit manual - = element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); - if (manual != null) { - manual.commit(); + KafkaManualCommit manual + = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (manual == null) { + List<?> exchanges = exchange.getMessage().getBody(List.class); + if (exchanges != null && !exchanges.isEmpty()) { + Object obj = exchanges.get(exchanges.size() - 1); + if (obj instanceof Exchange last) { + manual = last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); } } } + + if (manual != null) { + LOG.debug("Performing Kafka Batch manual commit: {}", manual); + manual.commit(); + } else { + LOG.debug("Cannot perform Kafka Batch manual commit due header: {} is missing", KafkaConstants.MANUAL_COMMIT); + } } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java new file mode 100644 index 00000000000..7d1d469b94c --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java @@ -0,0 +1,23 @@ +package org.apache.camel.component.kafka; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManualCommit implements Processor { + + private static final Logger LOG = LoggerFactory.getLogger(BatchManualCommit.class); + + @Override + public void process(Exchange exchange) throws Exception { + KafkaManualCommit manual = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (manual != null) { + LOG.debug("Performing Kafka manual commit: {}", manual); + manual.commit(); + } else { + LOG.debug("Cannot perform Kafka manual commit due header: {} is missing", KafkaConstants.MANUAL_COMMIT); + } + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java index e38761c4ef7..7eea8cdc5f4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java @@ -23,7 +23,6 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl KafkaManualCommitFactory.KafkaRecordPayload recordPayload, CommitManager commitManager) { super(camelExchangePayload, recordPayload); - this.commitManager = commitManager; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java index 5aa71a8ccfc..617526cae0a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java @@ -66,8 +66,6 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { /** * Gets the Camel Exchange payload - * - * @return */ public KafkaManualCommitFactory.CamelExchangePayload getCamelExchangePayload() { return camelExchangePayload; @@ -75,10 +73,14 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { /** * Gets the Kafka record payload - * - * @return */ public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload() { return kafkaRecordPayload; } + + @Override + public String toString() { + return "KafkaManualCommit[topic=" + getTopicName() + ", offset=" + getRecordOffset() + "]"; + } + } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 09247cddc39..44193f89c71 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -159,8 +159,13 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { Message message = exchange.getMessage(); var exchanges = exchangeList.stream().toList(); message.setBody(exchanges); + try { if (configuration.isAllowManualCommit()) { + Exchange last = exchanges.isEmpty() ? null : exchanges.get(exchanges.size() - 1); + if (last != null) { + message.setHeader(KafkaConstants.MANUAL_COMMIT, last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT)); + } manualCommitResultProcessing(camelKafkaConsumer, exchange); } else { autoCommitResultProcessing(camelKafkaConsumer, exchange, exchanges.size()); diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java index 8b97d3b9ff1..59258773a0f 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java @@ -23,6 +23,7 @@ import org.apache.camel.Processor; import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +@Deprecated public class BatchManualCommit implements Processor { @Override diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java index db92916f2b9..97bb8040a6d 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java @@ -21,6 +21,7 @@ import org.apache.camel.Processor; import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +@Deprecated public class ManualCommit implements Processor { @Override