orpiske commented on code in PR #11920: URL: https://github.com/apache/camel/pull/11920#discussion_r1385250145
########## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ########## @@ -112,30 +115,48 @@ public ProcessingResult processExchange( exchange.setException(e); } if (exchange.getException() != null) { - boolean breakOnErrorExit = processException(exchange, partition, lastResult.getPartitionLastOffset(), + + LOG.debug("an exception was thrown for record at partition {} and offset {}", + record.partition(), record.offset()); + + boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartitionLastOffset(), true); + + return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); } else { - return new ProcessingResult(false, record.offset(), exchange.getException() != null); + return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); } } private boolean processException( - Exchange exchange, TopicPartition partition, long partitionLastOffset, + Exchange exchange, TopicPartition topicPartition, + ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { + + if (lastResult.getPartition() != -1 && + lastResult.getPartition() != record.partition()) { + LOG.error("about to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + + " The last result was on partition {} with offset {} but was expecting partition {} with offset {}", + topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), + record.partition(), record.offset()); + } + // we are failing and we should break out if (LOG.isWarnEnabled()) { - LOG.warn("Error during processing {} from topic: {}", exchange, partition.topic(), exchange.getException()); - LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset); + LOG.warn("Error during processing {} from topic: {}", exchange, topicPartition.topic(), exchange.getException()); Review Comment: This is the only item remaining. As it is now, it is logging the message. Instead, I would like it to be exactly like this: ``` LOG.warn("Error during processing exchange {} from topic {}: {}", exchange, topicPartition.topic(), exchange.getException() ? null : exchange.getException().getMessage(), exchange.getException()); ``` Just copy/paste the code above and it should be fine ⬆️ ########## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ########## @@ -112,30 +115,48 @@ public ProcessingResult processExchange( exchange.setException(e); } if (exchange.getException() != null) { - boolean breakOnErrorExit = processException(exchange, partition, lastResult.getPartitionLastOffset(), + + LOG.debug("an exception was thrown for record at partition {} and offset {}", + record.partition(), record.offset()); + + boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartitionLastOffset(), true); + + return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); } else { - return new ProcessingResult(false, record.offset(), exchange.getException() != null); + return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); } } private boolean processException( - Exchange exchange, TopicPartition partition, long partitionLastOffset, + Exchange exchange, TopicPartition topicPartition, + ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { + + if (lastResult.getPartition() != -1 && + lastResult.getPartition() != record.partition()) { + LOG.error("about to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + + " The last result was on partition {} with offset {} but was expecting partition {} with offset {}", + topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), + record.partition(), record.offset()); + } + // we are failing and we should break out if (LOG.isWarnEnabled()) { - LOG.warn("Error during processing {} from topic: {}", exchange, partition.topic(), exchange.getException()); - LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset); + LOG.warn("Error during processing {} from topic: {}", exchange, topicPartition.topic(), exchange.getException()); Review Comment: This is the only item remaining. As it is now, it is logging the message. Instead, I would like it to be exactly like this: ``` LOG.warn("Error during processing exchange {} from topic {}: {}", exchange, topicPartition.topic(), exchange.getException() ? null : exchange.getException().getMessage(), exchange.getException()); ``` Just copy/paste the code above and it should be fine ⬆️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org