This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0102f6e98fc6b58774a50b1ccb07e6eef3c88fbf
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Thu Feb 4 23:12:27 2021 +0100

    fixed #980 : camel.source.contentLogLevel config not honored in source 
connectors
---
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java    | 14 +++++++++++---
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java  | 15 +++++++++++----
 .../apache/camel/kafkaconnector/CamelSinkTaskTest.java    | 13 +++++++++++++
 3 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9352ed6..82c16d2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask {
     private static final String LOCAL_URL = "direct:start";
     private ErrantRecordReporter reporter;
 
-
     private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private Endpoint localEndpoint;
+
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
     private boolean mapProperties;
     private boolean mapHeaders;
@@ -83,11 +83,11 @@ public class CamelSinkTask extends SinkTask {
                 }
             }
 
+            String levelStr = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
                 loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", 
CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+                LOG.debug("Invalid value {} for {} property", 
levelStr.toUpperCase(), 
CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             }
 
             String remoteUrl = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
@@ -239,4 +239,12 @@ public class CamelSinkTask extends SinkTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index feb979d..58a41ad 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -86,11 +86,11 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> actualProps = 
TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
             CamelSourceConnectorConfig config = 
getCamelSourceConnectorConfig(actualProps);
 
+            String levelStr = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
-                loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+                loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", 
CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+                LOG.error("Invalid value {} for {} property", 
levelStr.toUpperCase(), 
CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             }
 
             maxBatchPollSize = 
config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
@@ -181,7 +181,6 @@ public class CamelSourceTask extends SourceTask {
         return maxPollDuration - (Instant.now().toEpochMilli() - 
startPollEpochMilli);
     }
 
-
     @Override
     public synchronized List<SourceRecord> poll() {
         LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() 
- freeSlots.size());
@@ -366,4 +365,12 @@ public class CamelSourceTask extends SourceTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 5aaca7f..bab0a5d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest {
         }
     }
 
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, 
"INFO");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+        assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+
+        sinkTask.stop();
+    }
 }

Reply via email to