orpiske commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1463974918


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) 
implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = 
exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the 
expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", 
exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the 
exchanged has failed");

Review Comment:
   Indeed! 
   
   However, it will only happen if the user is not handling the error correctly 
(as described on the documentation). 
   
   We could add a flag, so that it logs only once ... but I also feel that it 
may be hard for the users to notice the problem. Maybe we could do that + also 
increase the log level to error, so that the users can find with more easily 
with `grep` or their observability tools. 
   
   Do you have any suggestions how we could this more elegantly? (Or, if we 
have solved this problem in some other component, just let me know and I'll do 
the same).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to