This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new de36a0a  Some folowup improvements fro #969 and #202.
de36a0a is described below

commit de36a0a2a63ffd947b19bcf3951e782105c7fc5e
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Mon Mar 8 15:57:09 2021 +0100

    Some folowup improvements fro #969 and #202.
---
 .../camel/kafkaconnector/CamelSourceTask.java      | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 51b055d..00ce145 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -219,11 +218,6 @@ public class CamelSourceTask extends SourceTask {
                 StreamCache sc = (StreamCache) messageBodyValue;
                 // reset to be sure that the cache is ready to be used before 
sending it in the record (could be useful for SMTs)
                 sc.reset();
-                try {
-                    messageBodyValue = sc.copy(exchange);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
             }
             for (String singleTopic : topics) {
                 CamelSourceRecord camelRecord = new 
CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, 
messageKeySchema,
@@ -256,15 +250,23 @@ public class CamelSourceTask extends SourceTask {
     }
 
     @Override
-    public void commitRecord(SourceRecord record, RecordMetadata metadata) 
throws InterruptedException {
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
+        LOG.debug("Committing record: {} with metadata: {}", record, metadata);
         ///XXX: this should be a safe cast please see: 
https://issues.apache.org/jira/browse/KAFKA-12391
         Integer claimCheck = ((CamelSourceRecord)record).getClaimCheck();
         LOG.debug("Committing record with claim check number: {}", claimCheck);
         Exchange correlatedExchange = exchangesWaitingForAck[claimCheck];
-        exchangesWaitingForAck[claimCheck] = null;
-        freeSlots.add(claimCheck);
-        UnitOfWorkHelper.doneSynchronizations(correlatedExchange, 
correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
-        LOG.debug("Record with claim check number: {} committed.", claimCheck);
+        try {
+            UnitOfWorkHelper.doneSynchronizations(correlatedExchange, 
correlatedExchange.adapt(ExtendedExchange.class).handoverCompletions(), LOG);
+            LOG.debug("Record with claim check number: {} committed.", 
claimCheck);
+        } catch (Throwable t) {
+            LOG.error("Exception during Unit Of Work completion: {} caused by: 
{}", t.getMessage(), t.getCause());
+            throw new RuntimeException(t);
+        } finally {
+            exchangesWaitingForAck[claimCheck] = null;
+            freeSlots.add(claimCheck);
+            LOG.debug("Claim check number: {} freed.", claimCheck);
+        }
     }
 
     @Override

Reply via email to