lburgazzoli commented on a change in pull request #1091:
URL: 
https://github.com/apache/camel-kafka-connector/pull/1091#discussion_r589037565



##########
File path: 
core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -210,6 +255,18 @@ private long remaining(long startPollEpochMilli, long 
maxPollDuration)  {
         return records.isEmpty() ? null : records;
     }
 
+    @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) 
throws InterruptedException {

Review comment:
       who is throwing InterrupedException ? we need to be sure not to left an 
exchange uncompleted and not tracked any more in the array

##########
File path: 
core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -177,31 +207,46 @@ private long remaining(long startPollEpochMilli, long 
maxPollDuration)  {
             Map<String, String> sourceOffset = 
Collections.singletonMap("position", exchange.getExchangeId());
 
             final Object messageHeaderKey = camelMessageHeaderKey != null ? 
exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-            final Object messageBodyValue = exchange.getMessage().getBody();
+            Object messageBodyValue = exchange.getMessage().getBody();
 
             final Schema messageKeySchema = messageHeaderKey != null ? 
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
             final Schema messageBodySchema = messageBodyValue != null ? 
SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
             final long timestamp = calculateTimestamp(exchange);
 
+            // take in account Cached camel streams
+            if (messageBodyValue instanceof StreamCache) {
+                StreamCache sc = (StreamCache) messageBodyValue;
+                // reset to be sure that the cache is ready to be used before 
sending it in the record (could be useful for SMTs)
+                sc.reset();
+                try {
+                    messageBodyValue = sc.copy(exchange);
+                } catch (IOException e) {
+                    e.printStackTrace();

Review comment:
       log or rethrow




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to