This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push:
new 9a556738f24 CAMEL-20044: add extra logging for BreakOnFirstError
(#11920)
9a556738f24 is described below
commit 9a556738f242b8648a9fc1dea531907f26ca3ce2
Author: Mike Barlotta <[email protected]>
AuthorDate: Wed Nov 8 12:51:25 2023 -0500
CAMEL-20044: add extra logging for BreakOnFirstError (#11920)
* add extra logging for KafkaRecordProcessor
* add extra logging for KafkaFetchRecords related to breakOnFirstError
* add more logging
* tweak logging
* fix the unexpected check
* fix logging to show partition and offset correctly
* address some PR comments
* update logging per PR comment
* update formatting and various props files
---------
Co-authored-by: 4741446 <[email protected]>
---
.../camel/component/kafka/KafkaFetchRecords.java | 32 +++++++++++++++
.../kafka/consumer/AbstractCommitManager.java | 2 +
.../consumer/support/KafkaRecordProcessor.java | 45 ++++++++++++++++------
.../support/KafkaRecordProcessorFacade.java | 3 ++
.../kafka/consumer/support/ProcessingResult.java | 13 ++++++-
5 files changed, 81 insertions(+), 14 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 610fab4f9ec..a213dbc1fe8 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -316,7 +316,9 @@ public class KafkaFetchRecords implements Runnable {
kafkaConsumer, threadId, commitManager, consumerListener);
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+
ProcessingResult lastResult = null;
+
while (isKafkaConsumerRunnableAndNotStopped() && isConnected() &&
pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
if (consumerListener != null) {
@@ -325,7 +327,32 @@ public class KafkaFetchRecords implements Runnable {
}
}
+ if (lastResult != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("This polling iteration is using lastresult
on partition {} and offset {}",
+ lastResult.getPartition(),
lastResult.getPartitionLastOffset());
+ }
+
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("This polling iteration is using lastresult
of null");
+ }
+ }
+
ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords, lastResult);
+
+ if (result != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("This polling iteration had a result
returned for partition {} and offset {}",
+ result.getPartition(),
result.getPartitionLastOffset());
+ }
+
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("This polling iteration had a result
returned as null");
+ }
+ }
+
updateTaskState();
if (result.isBreakOnErrorHit() &&
!this.state.equals(State.PAUSED)) {
LOG.debug("We hit an error ... setting flags to force
reconnect");
@@ -334,6 +361,11 @@ public class KafkaFetchRecords implements Runnable {
setConnected(false);
} else {
lastResult = result;
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("setting lastresult to partition {} and
offset {}",
+ lastResult.getPartition(),
lastResult.getPartitionLastOffset());
+ }
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index fe5abc3e403..a1da9823be4 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractCommitManager implements CommitManager {
public static final long START_OFFSET = -1;
+ public static final long NON_PARTITION = -1;
+
private static final Logger LOG =
LoggerFactory.getLogger(AbstractCommitManager.class);
protected final KafkaConsumer kafkaConsumer;
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 1afe53cbe2b..97875b097f1 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -63,6 +63,9 @@ public class KafkaRecordProcessor {
message.setHeader(KafkaConstants.KEY, record.key());
}
+ LOG.debug("setting up the exchange for message from partition {} and
offset {}",
+ record.partition(), record.offset());
+
message.setBody(record.value());
}
@@ -82,7 +85,7 @@ public class KafkaRecordProcessor {
}
public ProcessingResult processExchange(
- Exchange exchange, TopicPartition partition, boolean
partitionHasNext,
+ Exchange exchange, TopicPartition topicPartition, boolean
partitionHasNext,
boolean recordHasNext, ConsumerRecord<Object, Object> record,
ProcessingResult lastResult,
ExceptionHandler exceptionHandler) {
@@ -100,7 +103,7 @@ public class KafkaRecordProcessor {
if (configuration.isAllowManualCommit()) {
// allow Camel users to access the Kafka consumer API to be able
to do for example manual commits
- KafkaManualCommit manual = commitManager.getManualCommit(exchange,
partition, record);
+ KafkaManualCommit manual = commitManager.getManualCommit(exchange,
topicPartition, record);
message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext
&& !partitionHasNext);
@@ -112,30 +115,48 @@ public class KafkaRecordProcessor {
exchange.setException(e);
}
if (exchange.getException() != null) {
- boolean breakOnErrorExit = processException(exchange, partition,
lastResult.getPartitionLastOffset(),
+
+ LOG.debug("An exception was thrown for record at partition {} and
offset {}",
+ record.partition(), record.offset());
+
+ boolean breakOnErrorExit = processException(exchange,
topicPartition, record, lastResult,
exceptionHandler);
- return new ProcessingResult(breakOnErrorExit,
lastResult.getPartitionLastOffset(), true);
+
+ return new ProcessingResult(breakOnErrorExit,
lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
} else {
- return new ProcessingResult(false, record.offset(),
exchange.getException() != null);
+ return new ProcessingResult(false, record.partition(),
record.offset(), exchange.getException() != null);
}
}
private boolean processException(
- Exchange exchange, TopicPartition partition, long
partitionLastOffset,
+ Exchange exchange, TopicPartition topicPartition,
+ ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
ExceptionHandler exceptionHandler) {
// processing failed due to an unhandled exception, what should we do
if (configuration.isBreakOnFirstError()) {
+
+ if (lastResult.getPartition() != -1 &&
+ lastResult.getPartition() != record.partition()) {
+ LOG.error("About to process an exception with UNEXPECTED
partition & offset. Got topic partition {}. " +
+ " The last result was on partition {} with offset {}
but was expecting partition {} with offset {}",
+ topicPartition.partition(), lastResult.getPartition(),
lastResult.getPartitionLastOffset(),
+ record.partition(), record.offset());
+ }
+
// we are failing and we should break out
if (LOG.isWarnEnabled()) {
- LOG.warn("Error during processing {} from topic: {}",
exchange, partition.topic(), exchange.getException());
- LOG.warn("Will seek consumer to offset {} and start polling
again.", partitionLastOffset);
+ Exception exc = exchange.getException();
+ LOG.warn("Error during processing {} from topic: {} due to
{}", exchange, topicPartition.topic(),
+ exc.getMessage());
+ LOG.warn("Will seek consumer to offset {} on partition {} and
start polling again.",
+ lastResult.getPartitionLastOffset(),
lastResult.getPartition());
}
- // force commit, so we resume on next poll where we failed except
when the failure happened
- // at the first message in a poll
- if (partitionLastOffset != AbstractCommitManager.START_OFFSET) {
- commitManager.forceCommit(partition, partitionLastOffset);
+ // force commit, so we resume on next poll where we failed
+ // except when the failure happened at the first message in a poll
+ if (lastResult.getPartitionLastOffset() !=
AbstractCommitManager.START_OFFSET) {
+ commitManager.forceCommit(topicPartition,
lastResult.getPartitionLastOffset());
}
// continue to next partition
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index fbf6f3d09a8..134246891fb 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -78,6 +78,9 @@ public class KafkaRecordProcessorFacade {
lastResult = processRecord(partition,
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
kafkaRecordProcessor, record);
+ LOG.debug("Processed record on partition {} and offset {} and
got result for partition {} and offset {}",
+ record.partition(), record.offset(),
lastResult.getPartition(), lastResult.getPartitionLastOffset());
+
if (consumerListener != null) {
if (!consumerListener.afterProcess(lastResult)) {
commitManager.commit(partition);
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 36f0c69c8b2..fe3afd6ee8d 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -21,14 +21,19 @@ import
org.apache.camel.component.kafka.consumer.AbstractCommitManager;
public final class ProcessingResult {
private static final ProcessingResult UNPROCESSED_RESULT
- = new ProcessingResult(false, AbstractCommitManager.START_OFFSET,
false);
+ = new ProcessingResult(
+ false,
+ AbstractCommitManager.NON_PARTITION,
+ AbstractCommitManager.START_OFFSET, false);
private final boolean breakOnErrorHit;
+ private final long lastPartition;
private final long partitionLastOffset;
private final boolean failed;
- ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset,
boolean failed) {
+ ProcessingResult(boolean breakOnErrorHit, long lastPartition, long
partitionLastOffset, boolean failed) {
this.breakOnErrorHit = breakOnErrorHit;
+ this.lastPartition = lastPartition;
this.partitionLastOffset = partitionLastOffset;
this.failed = failed;
}
@@ -41,6 +46,10 @@ public final class ProcessingResult {
return partitionLastOffset;
}
+ public long getPartition() {
+ return lastPartition;
+ }
+
public boolean isFailed() {
return failed;
}