valdar commented on a change in pull request #1091: URL: https://github.com/apache/camel-kafka-connector/pull/1091#discussion_r589523811
########## File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java ########## @@ -177,31 +207,46 @@ private long remaining(long startPollEpochMilli, long maxPollDuration) { Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; - final Object messageBodyValue = exchange.getMessage().getBody(); + Object messageBodyValue = exchange.getMessage().getBody(); final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; final long timestamp = calculateTimestamp(exchange); + // take in account Cached camel streams + if (messageBodyValue instanceof StreamCache) { + StreamCache sc = (StreamCache) messageBodyValue; + // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs) + sc.reset(); + try { + messageBodyValue = sc.copy(exchange); + } catch (IOException e) { + e.printStackTrace(); Review comment: done in https://github.com/apache/camel-kafka-connector/pull/1093 ########## File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java ########## @@ -210,6 +255,18 @@ private long remaining(long startPollEpochMilli, long maxPollDuration) { return records.isEmpty() ? null : records; } + @Override + public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException { Review comment: done in https://github.com/apache/camel-kafka-connector/pull/1093 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org