orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1389114928


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -149,14 +146,30 @@ private boolean processException(
                 Exception exc = exchange.getException();
                 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
                         exc.getMessage());
-                LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-                        lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+                LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+                        record.offset(), record.partition());
             }
 
             // force commit, so we resume on next poll where we failed 
             // except when the failure happened at the first message in a poll
             if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-                commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+                // the record we are processing had the error 
+                // so we will force commit the offset prior 
+                // this will enable the current desired behavior to 
+                // retry the message 1 more time
+                //
+                // Note: without a more extensive look at handling of 
breakOnFirstError
+                // we will still need the lastResult so that we don't force 
+                // retrying this message over and over
+                // commitManager.forceCommit(topicPartition, record.offset() - 
1);
+                
+                // we should just do a commit (vs the original forceCommit)
+                // when route uses NOOP Commit Manager it will rely
+                // on the route implementation to explicitly commit offset
+                // when route uses Synch/Asynch Commit Manager it will 
+                // ALWAYS commit the offset for the failing record
+                // and will ALWAYS retry it
+                commitManager.commit(topicPartition);
             }

Review Comment:
   > This is the key change in behavior It goes beyond a fix and gives route 
implementers ability to choose how breakOnFirstError should work depending on 
the commit manager they use
   
   Thanks for the heads up. 
   
   I am OK with this given the scope of the change. The important thing here is 
that it needs to be documented on both the component doc as well as on the 
upgrade guide. 
   
   



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -149,14 +146,30 @@ private boolean processException(
                 Exception exc = exchange.getException();
                 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
                         exc.getMessage());
-                LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-                        lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+                LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+                        record.offset(), record.partition());
             }
 
             // force commit, so we resume on next poll where we failed 
             // except when the failure happened at the first message in a poll
             if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-                commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+                // the record we are processing had the error 
+                // so we will force commit the offset prior 
+                // this will enable the current desired behavior to 
+                // retry the message 1 more time
+                //
+                // Note: without a more extensive look at handling of 
breakOnFirstError
+                // we will still need the lastResult so that we don't force 
+                // retrying this message over and over
+                // commitManager.forceCommit(topicPartition, record.offset() - 
1);
+                
+                // we should just do a commit (vs the original forceCommit)
+                // when route uses NOOP Commit Manager it will rely
+                // on the route implementation to explicitly commit offset
+                // when route uses Synch/Asynch Commit Manager it will 
+                // ALWAYS commit the offset for the failing record
+                // and will ALWAYS retry it
+                commitManager.commit(topicPartition);
             }

Review Comment:
   > This is the key change in behavior It goes beyond a fix and gives route 
implementers ability to choose how breakOnFirstError should work depending on 
the commit manager they use
   
   Thanks for the heads up. 
   
   I am OK with this given the scope of the fix. The important thing here is 
that it needs to be documented on both the component doc as well as on the 
upgrade guide. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to