CodeSmell commented on code in PR #11935:
URL: https://github.com/apache/camel/pull/11935#discussion_r1386689966


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -112,30 +115,56 @@ public ProcessingResult processExchange(
             exchange.setException(e);
         }
         if (exchange.getException() != null) {
-            boolean breakOnErrorExit = processException(exchange, partition, 
lastResult.getPartitionLastOffset(),
+            
+            LOG.debug("an exception was thrown for record at partition {} and 
offset {}",
+                    record.partition(), record.offset());
+            
+            boolean breakOnErrorExit = processException(exchange, 
topicPartition, record, lastResult,
                     exceptionHandler);
-            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartitionLastOffset(), true);
+            
+            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
         } else {
-            return new ProcessingResult(false, record.offset(), 
exchange.getException() != null);
+            return new ProcessingResult(false, record.partition(), 
record.offset(), exchange.getException() != null);
         }
     }
 
     private boolean processException(
-            Exchange exchange, TopicPartition partition, long 
partitionLastOffset,
+            Exchange exchange, TopicPartition topicPartition, 
+            ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
             ExceptionHandler exceptionHandler) {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
+            
+            if (lastResult.getPartition() != -1 &&
+                lastResult.getPartition() != record.partition()) {
+                LOG.error("about to process an exception with UNEXPECTED 
partition & offset. Got topic partition {}. " + 
+                        " The last result was on partition {} with offset {} 
but was expecting partition {} with offset {}",
+                        topicPartition.partition(), lastResult.getPartition(), 
lastResult.getPartitionLastOffset(), 
+                        record.partition(), record.offset());
+            }
+            
             // we are failing and we should break out
             if (LOG.isWarnEnabled()) {
-                LOG.warn("Error during processing {} from topic: {}", 
exchange, partition.topic(), exchange.getException());
-                LOG.warn("Will seek consumer to offset {} and start polling 
again.", partitionLastOffset);
+                LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(), exchange.getException());
+                LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+                        record.offset(), record.partition());
             }
 
-            // force commit, so we resume on next poll where we failed except 
when the failure happened
-            // at the first message in a poll
-            if (partitionLastOffset != AbstractCommitManager.START_OFFSET) {
-                commitManager.forceCommit(partition, partitionLastOffset);
+            // force commit, so we resume on next poll where we failed 
+            // except when the failure happened at the first message in a poll
+            if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
+                // the record we are processing had the error 
+                // so we will force commit the offset prior 
+                // this will enable the current desired behavior to 
+                // retry the message 1 more time
+                //
+                // Note: without a more extensive look at handling of 
breakOnFirstError
+                // we will still need the lastResult so that we don't force 
+                // retrying this message over and over
+                //if (configuration.isBreakOnFirstErrorWithRetry()) {
+                    commitManager.forceCommit(topicPartition, record.offset() 
- 1);

Review Comment:
   I am good w/ what Camel team thinks is best.
   The change here using `forceCommit` will fix the 2 reported issues so would 
like to see at least that move forward if you don't see any problems.
   
   Looking at the code, if we did make some more changes, we may be able to 
remove the `lastResult` object that is passed around as well since we have 
access to the Kafka record. 
   
   I can update the docs w/ whatever the new behavior is but may need a pointer 
as to where/how to do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to