This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit acf7e0b98813837adf263a727345df88ac87dea5 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Feb 1 11:43:30 2024 +0100 CAMEL-20380: added and/or improved logging messages This should simplify debugging the issue --- .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 5 ++++- .../kafka/integration/batching/BatchingProcessingITSupport.java | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index dccec83e00c..8de8517ea1e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -352,7 +352,8 @@ public class KafkaFetchRecords implements Runnable { safeUnsubscribe(); } catch (InterruptException e) { - kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", + kafkaConsumer.getExceptionHandler().handleException( + "Thread " + threadId + " interrupted while consuming from kafka topic", e); commitManager.commit(); @@ -512,8 +513,10 @@ public class KafkaFetchRecords implements Runnable { } // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop + LOG.trace("Waking up Kafka consumer"); consumer.wakeup(); } catch (InterruptedException e) { + LOG.trace("Interrupted while waiting for processing to finish: waking up Kafka consumer"); consumer.wakeup(); Thread.currentThread().interrupt(); } finally { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java index f50aef3fd99..a54ccb5464f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java @@ -92,6 +92,7 @@ abstract class BatchingProcessingITSupport extends BaseEmbeddedKafkaTestSupport // Fourth step: We start again our route, since we have been committing the offsets from the first step, // we will expect to consume from the latest committed offset (e.g.: from offset 5() + LOG.debug("Starting the fourth step"); contextExtension.getContext().getRouteController().startRoute("batching"); setupPostExecutionExpectations();