orpiske commented on code in PR #12879: URL: https://github.com/apache/camel/pull/12879#discussion_r1463974918
########## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java: ########## @@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { private final Processor processor; private final CommitManager commitManager; + private record CommitSynchronization(CommitManager commitManager) implements Synchronization { + + @Override + public void onComplete(Exchange exchange) { + final List<?> exchanges = exchange.getMessage().getBody(List.class); + + // Ensure we are actually receiving what we are asked for + if (exchanges == null || exchanges.isEmpty()) { + LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty"); + return; + } + + LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName()); + commitManager.commit(); + } + + @Override + public void onFailure(Exchange exchange) { + LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed"); Review Comment: Indeed! However, it will only happen if the user is not handling the error correctly (as described on the documentation). We could add a flag, so that it logs only once ... but I also feel that it may be hard for the users to notice the problem. Maybe we could do that + also increase the log level to error, so that the users can find with more easily with `grep` or their observability tools. Do you have any suggestions how we could this more elegantly? (Or, if we have solved this problem in some other component, just let me know and I'll do the same). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org