CodeSmell commented on code in PR #11935: URL: https://github.com/apache/camel/pull/11935#discussion_r1386624448
########## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ########## @@ -112,30 +115,56 @@ public ProcessingResult processExchange( exchange.setException(e); } if (exchange.getException() != null) { - boolean breakOnErrorExit = processException(exchange, partition, lastResult.getPartitionLastOffset(), + + LOG.debug("an exception was thrown for record at partition {} and offset {}", + record.partition(), record.offset()); + + boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartitionLastOffset(), true); + + return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); } else { - return new ProcessingResult(false, record.offset(), exchange.getException() != null); + return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); } } private boolean processException( - Exchange exchange, TopicPartition partition, long partitionLastOffset, + Exchange exchange, TopicPartition topicPartition, + ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { + + if (lastResult.getPartition() != -1 && + lastResult.getPartition() != record.partition()) { + LOG.error("about to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + + " The last result was on partition {} with offset {} but was expecting partition {} with offset {}", + topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), + record.partition(), record.offset()); + } + // we are failing and we should break out if (LOG.isWarnEnabled()) { - LOG.warn("Error during processing {} from topic: {}", exchange, partition.topic(), exchange.getException()); - LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset); + LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exchange.getException()); + LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", + record.offset(), record.partition()); } - // force commit, so we resume on next poll where we failed except when the failure happened - // at the first message in a poll - if (partitionLastOffset != AbstractCommitManager.START_OFFSET) { - commitManager.forceCommit(partition, partitionLastOffset); + // force commit, so we resume on next poll where we failed + // except when the failure happened at the first message in a poll + if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { + // the record we are processing had the error + // so we will force commit the offset prior + // this will enable the current desired behavior to + // retry the message 1 more time + // + // Note: without a more extensive look at handling of breakOnFirstError + // we will still need the lastResult so that we don't force + // retrying this message over and over + //if (configuration.isBreakOnFirstErrorWithRetry()) { + commitManager.forceCommit(topicPartition, record.offset() - 1); Review Comment: @orpiske some thoughts here on this The `forceCommit` was what we had here so kept that for now. But based on some of the other conversations I think it may be worth changing that to `commit`. That would defer how `breakOnFirstError` would work based on the `commitManager` - no op would mean the route implementation would have to handle the offset using `KafkaManualCommit` - using synch/asynch would mean that the record with the error would be retried the retry will vary still based on other configuration which is what I think contributes to this flag being so complex and why there are many opinions on how it should work (CAMEL-20089) This may be a way to alter some of the behavior in 3.x until a refactor in 4.x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org