This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-kafka-connector-0.7.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 0102f6e98fc6b58774a50b1ccb07e6eef3c88fbf Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Thu Feb 4 23:12:27 2021 +0100 fixed #980 : camel.source.contentLogLevel config not honored in source connectors --- .../org/apache/camel/kafkaconnector/CamelSinkTask.java | 14 +++++++++++--- .../org/apache/camel/kafkaconnector/CamelSourceTask.java | 15 +++++++++++---- .../apache/camel/kafkaconnector/CamelSinkTaskTest.java | 13 +++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 9352ed6..82c16d2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask { private static final String LOCAL_URL = "direct:start"; private ErrantRecordReporter reporter; - private CamelKafkaConnectMain cms; private ProducerTemplate producer; private Endpoint localEndpoint; + private LoggingLevel loggingLevel = LoggingLevel.OFF; private boolean mapProperties; private boolean mapHeaders; @@ -83,11 +83,11 @@ public class CamelSinkTask extends SinkTask { } } + String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); try { - String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase()); } catch (Exception e) { - LOG.debug("Invalid value for {} property", CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); + LOG.debug("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); } String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); @@ -239,4 +239,12 @@ public class CamelSinkTask extends SinkTask { CamelKafkaConnectMain getCms() { return cms; } + + public LoggingLevel getLoggingLevel() { + return loggingLevel; + } + + public void setLoggingLevel(LoggingLevel loggingLevel) { + this.loggingLevel = loggingLevel; + } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index feb979d..58a41ad 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -86,11 +86,11 @@ public class CamelSourceTask extends SourceTask { Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props); CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps); + String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); try { - String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); - loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase()); + loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase()); } catch (Exception e) { - LOG.debug("Invalid value for {} property", CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); + LOG.error("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); } maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF); @@ -181,7 +181,6 @@ public class CamelSourceTask extends SourceTask { return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli); } - @Override public synchronized List<SourceRecord> poll() { LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size()); @@ -366,4 +365,12 @@ public class CamelSourceTask extends SourceTask { CamelKafkaConnectMain getCms() { return cms; } + + public LoggingLevel getLoggingLevel() { + return loggingLevel; + } + + public void setLoggingLevel(LoggingLevel loggingLevel) { + this.loggingLevel = loggingLevel; + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 5aaca7f..bab0a5d 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest { } } + @Test + public void testContentLogLevelConfiguration() { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO"); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel()); + + sinkTask.stop(); + } }