This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new de36a0a Some folowup improvements fro #969 and #202. de36a0a is described below commit de36a0a2a63ffd947b19bcf3951e782105c7fc5e Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Mon Mar 8 15:57:09 2021 +0100 Some folowup improvements fro #969 and #202. --- .../camel/kafkaconnector/CamelSourceTask.java | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 51b055d..00ce145 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -16,7 +16,6 @@ */ package org.apache.camel.kafkaconnector; -import java.io.IOException; import java.math.BigDecimal; import java.time.Instant; import java.util.ArrayList; @@ -219,11 +218,6 @@ public class CamelSourceTask extends SourceTask { StreamCache sc = (StreamCache) messageBodyValue; // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs) sc.reset(); - try { - messageBodyValue = sc.copy(exchange); - } catch (IOException e) { - e.printStackTrace(); - } } for (String singleTopic : topics) { CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema, @@ -256,15 +250,23 @@ public class CamelSourceTask extends SourceTask { } @Override - public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { + public void commitRecord(SourceRecord record, RecordMetadata metadata) { + LOG.debug("Committing record: {} with metadata: {}", record, metadata); ///XXX: this should be a safe cast please see: https://issues.apache.org/jira/browse/KAFKA-12391 Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck(); LOG.debug("Committing record with claim check number: {}", claimCheck); Exchange correlatedExchange = exchangesWaitingForAck[claimCheck]; - exchangesWaitingForAck[claimCheck] = null; - freeSlots.add(claimCheck); - UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG); - LOG.debug("Record with claim check number: {} committed.", claimCheck); + try { + UnitOfWorkHelper.doneSynchronizations(correlatedExchange, correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG); + LOG.debug("Record with claim check number: {} committed.", claimCheck); + } catch (Throwable t) { + LOG.error("Exception during Unit Of Work completion: {} caused by: {}", t.getMessage(), t.getCause()); + throw new RuntimeException(t); + } finally { + exchangesWaitingForAck[claimCheck] = null; + freeSlots.add(claimCheck); + LOG.debug("Claim check number: {} freed.", claimCheck); + } } @Override