This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push: new f1cc4f9fb39 remove lastResult from Kafka Camel component (#12002) f1cc4f9fb39 is described below commit f1cc4f9fb3981035bfab37830600c29f8e364779 Author: Mike Barlotta <codesm...@users.noreply.github.com> AuthorDate: Tue Jan 9 09:11:48 2024 -0500 remove lastResult from Kafka Camel component (#12002) --- .../camel/component/kafka/KafkaFetchRecords.java | 46 ++++----------------- .../consumer/support/KafkaRecordProcessor.java | 48 +++++++++------------- .../support/KafkaRecordProcessorFacade.java | 38 +++++++---------- .../kafka/consumer/support/ProcessingResult.java | 20 +-------- 4 files changed, 44 insertions(+), 108 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index e13f7d86e99..fe9b3bdf9ee 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -297,8 +297,7 @@ public class KafkaFetchRecords implements Runnable { } protected void startPolling() { - long partitionLastOffset = -1; - + try { /* * We lock the processing of the record to avoid raising a WakeUpException as a result to a call @@ -307,7 +306,8 @@ public class KafkaFetchRecords implements Runnable { lock.lock(); long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); - + Duration pollDuration = Duration.ofMillis(pollTimeoutMs); + if (LOG.isTraceEnabled()) { LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs); } @@ -315,10 +315,6 @@ public class KafkaFetchRecords implements Runnable { KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade( kafkaConsumer, threadId, commitManager, consumerListener); - Duration pollDuration = Duration.ofMillis(pollTimeoutMs); - - ProcessingResult lastResult = null; - while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { @@ -327,43 +323,15 @@ public class KafkaFetchRecords implements Runnable { } } - if (lastResult != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration is using lastresult on partition {} and offset {}", - lastResult.getPartition(), lastResult.getPartitionLastOffset()); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration is using lastresult of null"); - } - } - - ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult); - - if (result != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration had a result returned for partition {} and offset {}", - result.getPartition(), result.getPartitionLastOffset()); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("This polling iteration had a result returned as null"); - } - } - + ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); updateTaskState(); + + // when breakOnFirstError we want to unsubscribe from Kafka if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) { LOG.debug("We hit an error ... setting flags to force reconnect"); // force re-connect setReconnect(true); setConnected(false); - } else { - lastResult = result; - - if (LOG.isTraceEnabled()) { - LOG.trace("Setting lastresult to partition {} and offset {}", - lastResult.getPartition(), lastResult.getPartitionLastOffset()); - } } } @@ -397,6 +365,8 @@ public class KafkaFetchRecords implements Runnable { e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage()); } + // why do we set this to -1 + long partitionLastOffset = -1; pollExceptionStrategy.handle(partitionLastOffset, e); } finally { // only close if not retry diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index e731e4c4763..1299bdc0ba8 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -23,7 +23,6 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.component.kafka.consumer.AbstractCommitManager; import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; @@ -86,8 +85,7 @@ public class KafkaRecordProcessor { public ProcessingResult processExchange( Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext, - boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult, - ExceptionHandler exceptionHandler) { + boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) { Message message = exchange.getMessage(); @@ -114,33 +112,31 @@ public class KafkaRecordProcessor { } catch (Exception e) { exchange.setException(e); } + + ProcessingResult result = ProcessingResult.newUnprocessed(); if (exchange.getException() != null) { LOG.debug("An exception was thrown for record at partition {} and offset {}", record.partition(), record.offset()); - boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, - exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); + boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler); + result = new ProcessingResult(breakOnErrorExit, true); } else { - return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); + result = new ProcessingResult(false, exchange.getException() != null); } + + if (!result.isBreakOnErrorHit()) { + commitManager.recordOffset(topicPartition, record.offset()); + } + + return result; } private boolean processException( Exchange exchange, TopicPartition topicPartition, - ConsumerRecord<Object, Object> record, ProcessingResult lastResult, - ExceptionHandler exceptionHandler) { + ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { - if (lastResult.getPartition() != -1 && - lastResult.getPartition() != record.partition()) { - LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + - " The last result was on partition {} with offset {} but was expecting partition {} with offset {}", - topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), - record.partition(), record.offset()); - } - // we are failing and we should break out if (LOG.isWarnEnabled()) { Exception exc = exchange.getException(); @@ -150,17 +146,13 @@ public class KafkaRecordProcessor { record.offset(), record.partition()); } - // force commit, so we resume on next poll where we failed - // except when the failure happened at the first message in a poll - if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { - // we should just do a commit (vs the original forceCommit) - // when route uses NOOP Commit Manager it will rely - // on the route implementation to explicitly commit offset - // when route uses Synch/Asynch Commit Manager it will - // ALWAYS commit the offset for the failing record - // and will ALWAYS retry it - commitManager.commit(topicPartition); - } + // we should just do a commit (vs the original forceCommit) + // when route uses NOOP Commit Manager it will rely + // on the route implementation to explicitly commit offset + // when route uses Synch/Asynch Commit Manager it will + // ALWAYS commit the offset for the failing record + // and will ALWAYS retry it + commitManager.commit(topicPartition); // continue to next partition return true; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index 44573daa60d..b114c3ff8a6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -54,18 +54,16 @@ public class KafkaRecordProcessorFacade { return camelKafkaConsumer.isStopping(); } - public ProcessingResult processPolledRecords( - ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) { + public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) { logRecords(allRecords); + + ProcessingResult result = ProcessingResult.newUnprocessed(); Set<TopicPartition> partitions = allRecords.partitions(); Iterator<TopicPartition> partitionIterator = partitions.iterator(); LOG.debug("Poll received records on {} partitions", partitions.size()); - ProcessingResult lastResult - = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll; - while (partitionIterator.hasNext() && !isStopping()) { TopicPartition partition = partitionIterator.next(); @@ -76,34 +74,32 @@ public class KafkaRecordProcessorFacade { logRecordsInPartition(partitionRecords, partition); - while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { + while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { ConsumerRecord<Object, Object> record = recordIterator.next(); LOG.debug("Processing record on partition {} with offset {}", record.partition(), record.offset()); - lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, + result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), kafkaRecordProcessor, record); - LOG.debug( - "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}", - record.partition(), record.offset(), lastResult.getPartition(), lastResult.getPartitionLastOffset()); + LOG.debug("Processed record on partition {} with offset {}", record.partition(), record.offset()); if (consumerListener != null) { - if (!consumerListener.afterProcess(lastResult)) { + if (!consumerListener.afterProcess(result)) { commitManager.commit(partition); - return lastResult; + return result; } } } - if (!lastResult.isBreakOnErrorHit()) { + if (!result.isBreakOnErrorHit()) { LOG.debug("Committing offset on successful execution"); // all records processed from partition so commit them commitManager.commit(partition); } } - return lastResult; + return result; } private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) { @@ -123,7 +119,6 @@ public class KafkaRecordProcessorFacade { TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, - final ProcessingResult lastResult, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> record) { @@ -131,18 +126,13 @@ public class KafkaRecordProcessorFacade { Exchange exchange = camelKafkaConsumer.createExchange(false); - ProcessingResult currentResult - = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, - recordHasNext, record, lastResult, camelKafkaConsumer.getExceptionHandler()); - - if (!currentResult.isBreakOnErrorHit()) { - commitManager.recordOffset(partition, currentResult.getPartitionLastOffset()); - } - + ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, + recordHasNext, record, camelKafkaConsumer.getExceptionHandler()); + // success so release the exchange camelKafkaConsumer.releaseExchange(exchange, false); - return currentResult; + return result; } private void logRecord(ConsumerRecord<Object, Object> record) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java index fe3afd6ee8d..87f88c6e23d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java @@ -17,24 +17,16 @@ package org.apache.camel.component.kafka.consumer.support; -import org.apache.camel.component.kafka.consumer.AbstractCommitManager; public final class ProcessingResult { private static final ProcessingResult UNPROCESSED_RESULT - = new ProcessingResult( - false, - AbstractCommitManager.NON_PARTITION, - AbstractCommitManager.START_OFFSET, false); + = new ProcessingResult(false, false); private final boolean breakOnErrorHit; - private final long lastPartition; - private final long partitionLastOffset; private final boolean failed; - ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) { + ProcessingResult(boolean breakOnErrorHit, boolean failed) { this.breakOnErrorHit = breakOnErrorHit; - this.lastPartition = lastPartition; - this.partitionLastOffset = partitionLastOffset; this.failed = failed; } @@ -42,14 +34,6 @@ public final class ProcessingResult { return breakOnErrorHit; } - public long getPartitionLastOffset() { - return partitionLastOffset; - } - - public long getPartition() { - return lastPartition; - } - public boolean isFailed() { return failed; }