This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit acf7e0b98813837adf263a727345df88ac87dea5
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Thu Feb 1 11:43:30 2024 +0100

    CAMEL-20380: added and/or improved logging messages
    
    This should simplify debugging the issue
---
 .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java     | 5 ++++-
 .../kafka/integration/batching/BatchingProcessingITSupport.java      | 1 +
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index dccec83e00c..8de8517ea1e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -352,7 +352,8 @@ public class KafkaFetchRecords implements Runnable {
 
             safeUnsubscribe();
         } catch (InterruptException e) {
-            kafkaConsumer.getExceptionHandler().handleException("Interrupted 
while consuming " + threadId + " from kafka topic",
+            kafkaConsumer.getExceptionHandler().handleException(
+                    "Thread " + threadId + " interrupted while consuming from 
kafka topic",
                     e);
             commitManager.commit();
 
@@ -512,8 +513,10 @@ public class KafkaFetchRecords implements Runnable {
             }
 
             // As advised in the KAFKA-1894 ticket, calling this wakeup method 
breaks the infinite loop
+            LOG.trace("Waking up Kafka consumer");
             consumer.wakeup();
         } catch (InterruptedException e) {
+            LOG.trace("Interrupted while waiting for processing to finish: 
waking up Kafka consumer");
             consumer.wakeup();
             Thread.currentThread().interrupt();
         } finally {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
index f50aef3fd99..a54ccb5464f 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
@@ -92,6 +92,7 @@ abstract class BatchingProcessingITSupport extends 
BaseEmbeddedKafkaTestSupport
 
         // Fourth step: We start again our route, since we have been 
committing the offsets from the first step,
         // we will expect to consume from the latest committed offset (e.g.: 
from offset 5()
+        LOG.debug("Starting the fourth step");
         
contextExtension.getContext().getRouteController().startRoute("batching");
         setupPostExecutionExpectations();
 

Reply via email to