This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new dc5b5fd  Code cleanup
     new 2095b4b  Merge pull request #184 from fvaleri/cleanup
dc5b5fd is described below

commit dc5b5fdb189c8402535d41695320b508f870e9f6
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Sun May 3 17:59:51 2020 +0200

    Code cleanup
---
 .../camel/kafkaconnector/CamelSinkConnector.java   |  6 +-
 .../kafkaconnector/CamelSinkConnectorConfig.java   | 13 +++--
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 18 +++---
 .../kafkaconnector/CamelSourceConnectorConfig.java | 26 +++++----
 .../camel/kafkaconnector/CamelSourceTask.java      |  5 +-
 .../kafkaconnector/utils/CamelMainSupport.java     | 26 ++++-----
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 65 +++++++++++-----------
 .../maven/AbstractCamelKafkaConnectorMojo.java     |  2 +-
 8 files changed, 80 insertions(+), 81 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
index 7cc47f0..092e513 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CamelSinkConnector extends SinkConnector {
-    private static Logger log = 
LoggerFactory.getLogger(CamelSinkConnector.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkConnector.class);
 
     private Map<String, String> configProps;
 
@@ -38,7 +38,7 @@ public class CamelSinkConnector extends SinkConnector {
 
     @Override
     public void start(Map<String, String> configProps) {
-        log.info("Connector config keys: {}", String.join(", ", 
configProps.keySet()));
+        LOG.info("Connector config keys: {}", String.join(", ", 
configProps.keySet()));
         this.configProps = configProps;
     }
 
@@ -49,7 +49,7 @@ public class CamelSinkConnector extends SinkConnector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        log.info("Setting task configurations for {} workers.", maxTasks);
+        LOG.info("Setting task configurations for {} workers.", maxTasks);
         final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
         for (int i = 0; i < maxTasks; ++i) {
             configs.add(configProps);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index d59d342..e11e84d 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -38,18 +38,21 @@ public class CamelSinkConnectorConfig extends 
AbstractConfig {
     public static final String CAMEL_SINK_URL_DOC = "The camel url to 
configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
             + " and all the properties starting with " + 
CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + 
CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
 
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, 
Importance.HIGH, CAMEL_SINK_URL_DOC)
+        .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, 
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
+        .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, 
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
     }
 
     public CamelSinkConnectorConfig(Map<String, String> parsedConfig) {
-        this(conf(), parsedConfig);
+        this(CONFIG_DEF, parsedConfig);
     }
 
     public static ConfigDef conf() {
-        return new ConfigDef()
-                .define(CAMEL_SINK_URL_CONF, Type.STRING, 
CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
-                .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, 
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
-                .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, 
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+        return CONFIG_DEF;
     }
+
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9785215..baa6665 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,11 +19,9 @@ package org.apache.camel.kafkaconnector;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
@@ -44,7 +42,7 @@ public class CamelSinkTask extends SinkTask {
     private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX = 
"camel.sink.endpoint.";
     private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX = 
"camel.sink.path.";
 
-    private static Logger log = LoggerFactory.getLogger(CamelSinkTask.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkTask.class);
 
     private static final String LOCAL_URL = "direct:start";
     private static final String HEADER_CAMEL_PREFIX = "CamelHeader";
@@ -62,7 +60,7 @@ public class CamelSinkTask extends SinkTask {
     @Override
     public void start(Map<String, String> props) {
         try {
-            log.info("Starting CamelSinkTask connector task");
+            LOG.info("Starting CamelSinkTask connector task");
             Map<String, String> actualProps = 
TaskHelper.mergeProperties(getDefaultConfig(), props);
             config = getCamelSinkConnectorConfig(actualProps);
 
@@ -78,7 +76,7 @@ public class CamelSinkTask extends SinkTask {
             producer = cms.createProducerTemplate();
 
             cms.start();
-            log.info("CamelSinkTask connector task started");
+            LOG.info("CamelSinkTask connector task started");
         } catch (Exception e) {
             throw new ConnectException("Failed to create and start Camel 
context", e);
         }
@@ -116,20 +114,20 @@ public class CamelSinkTask extends SinkTask {
             }
             exchange.getMessage().setHeaders(headers);
             exchange.getMessage().setBody(record.value());
-            log.debug("Sending {} to {}", exchange, LOCAL_URL);
+            LOG.debug("Sending {} to {}", exchange, LOCAL_URL);
             producer.send(LOCAL_URL, exchange);
         }
     }
 
     @Override
     public void stop() {
+        LOG.info("Stopping CamelSinkTask connector task");
         try {
-            log.info("Stopping CamelSinkTask connector task");
             cms.stop();
         } catch (Exception e) {
             throw new ConnectException("Failed to stop Camel context", e);
         } finally {
-            log.info("CamelSinkTask connector task stopped");
+            LOG.info("CamelSinkTask connector task stopped");
         }
     }
 
@@ -157,7 +155,7 @@ public class CamelSinkTask extends SinkTask {
             map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
 {
             map.put(singleHeader.key(), (List<?>)singleHeader.value());
-        } 
+        }
     }
 
     private void addProperty(Exchange exchange, Header singleHeader) {
@@ -184,7 +182,7 @@ public class CamelSinkTask extends SinkTask {
             exchange.getProperties().put(singleHeader.key(), (Map<?, 
?>)singleHeader.value());
         } else if 
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
 {
             exchange.getProperties().put(singleHeader.key(), 
(List<?>)singleHeader.value());
-        } 
+        }
     }
 
     public CamelMainSupport getCms() {
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 930ae8b..3de49dd 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -67,25 +67,27 @@ public class CamelSourceConnectorConfig extends 
AbstractConfig {
     public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name 
of a camel message header containing an unique key that can be used as a Kafka 
message key."
           +  " If this is not specified, then the Kafka message will not have 
a key.";
 
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, 
Importance.HIGH, CAMEL_SOURCE_URL_DOC)
+        .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, 
CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
+        .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, 
ConfigDef.Importance.HIGH, TOPIC_DOC)
+        .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, 
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
+        .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, 
CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+        .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, 
CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
+        .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
+        .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, 
Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, 
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+        .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, 
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
+        .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, 
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
     }
 
     public CamelSourceConnectorConfig(Map<String, String> parsedConfig) {
-        this(conf(), parsedConfig);
+        this(CONFIG_DEF, parsedConfig);
     }
 
     public static ConfigDef conf() {
-        return new ConfigDef()
-                .define(CAMEL_SOURCE_URL_CONF, Type.STRING, 
CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
-                .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, 
CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
-                .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, 
ConfigDef.Importance.HIGH, TOPIC_DOC)
-                .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, 
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
-                .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, 
CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, 
Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, 
Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, 
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, 
Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, 
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
-                .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, 
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, 
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
-                .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, 
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+        return CONFIG_DEF;
     }
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 5025296..83ea1b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -24,7 +24,6 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -91,7 +90,7 @@ public class CamelSourceTask extends SourceTask {
             Endpoint endpoint = cms.getEndpoint(localUrl);
             consumer = endpoint.createPollingConsumer();
             consumer.start();
-            
+
             cms.start();
             LOG.info("CamelSourceTask connector task started");
         } catch (Exception e) {
@@ -143,7 +142,7 @@ public class CamelSourceTask extends SourceTask {
         }
 
         if (records.isEmpty()) {
-            return null;
+            return Collections.EMPTY_LIST;
         } else {
             return records;
         }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index ddfd1e2..91735f2 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 
 public class CamelMainSupport {
     public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = 
"camel.dataformat.";
-    private static Logger log = 
LoggerFactory.getLogger(CamelMainSupport.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelMainSupport.class);
 
     private Main camelMain;
     private CamelContext camel;
@@ -83,7 +83,7 @@ public class CamelMainSupport {
         Properties camelProperties = new OrderedProperties();
         camelProperties.putAll(orderedProps);
 
-        log.info("Setting initial properties in Camel context: [{}]", 
camelProperties);
+        LOG.info("Setting initial properties in Camel context: [{}]", 
camelProperties);
         
this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
 
         //creating the actual route
@@ -93,15 +93,15 @@ public class CamelMainSupport {
                 if (marshal != null && unmarshal != null) {
                     throw new UnsupportedOperationException("Uses of both 
marshal (i.e. " + marshal + ") and unmarshal (i.e. " + unmarshal + ") is not 
supported");
                 } else if (marshal != null) {
-                    log.info("Creating Camel route 
from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
+                    LOG.info("Creating Camel route 
from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
                     camel.getRegistry().bind(marshal, 
lookupAndInstantiateDataformat(marshal));
                     rd.marshal().custom(marshal);
                 } else if (unmarshal != null) {
-                    log.info("Creating Camel route 
from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
+                    LOG.info("Creating Camel route 
from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
                     camel.getRegistry().bind(unmarshal, 
lookupAndInstantiateDataformat(unmarshal));
                     rd.unmarshal().custom(unmarshal);
                 } else {
-                    log.info("Creating Camel route from({}).to({})", fromUrl, 
toUrl);
+                    LOG.info("Creating Camel route from({}).to({})", fromUrl, 
toUrl);
                 }
                 rd.to(toUrl);
             }
@@ -109,27 +109,27 @@ public class CamelMainSupport {
     }
 
     public void start() throws Exception {
-        log.info("Starting CamelContext");
+        LOG.info("Starting CamelContext");
 
         CamelContextStarter starter = new CamelContextStarter();
         exService.execute(starter);
         startFinishedSignal.await();
 
         if (starter.hasException()) {
-            log.info("CamelContext failed to start", starter.getException());
+            LOG.info("CamelContext failed to start", starter.getException());
             throw starter.getException();
         }
 
-        log.info("CamelContext started");
+        LOG.info("CamelContext started");
     }
 
     public void stop() {
-        log.info("Stopping CamelContext");
+        LOG.info("Stopping CamelContext");
 
         camelMain.stop();
         exService.shutdown();
 
-        log.info("CamelContext stopped");
+        LOG.info("CamelContext stopped");
     }
 
     public ProducerTemplate createProducerTemplate() {
@@ -190,7 +190,7 @@ public class CamelMainSupport {
 
         @Override
         public void afterStart(BaseMainSupport main) {
-            log.trace("Signaling CamelContext startup is finished 
(startFinishedSignal.countDown();) due to CamelMainFinishedListener been 
called");
+            LOG.trace("Signaling CamelContext startup is finished 
(startFinishedSignal.countDown();) due to CamelMainFinishedListener been 
called");
             startFinishedSignal.countDown();
         }
 
@@ -217,10 +217,10 @@ public class CamelMainSupport {
             try {
                 camelMain.run();
             } catch (Exception e) {
-                log.error("An exception has occurred before CamelContext 
startup has finished", e);
+                LOG.error("An exception has occurred before CamelContext 
startup has finished", e);
                 startException = e;
                 if (startFinishedSignal.getCount() > 0) {
-                    log.trace("Signaling CamelContext startup is finished 
(startFinishedSignal.countDown();) due to an exception");
+                    LOG.trace("Signaling CamelContext startup is finished 
(startFinishedSignal.countDown();) due to an exception");
                     startFinishedSignal.countDown();
                 }
             }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 934b78d..89205ae 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,20 +16,15 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.camel.ConsumerTemplate;
-import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
@@ -40,16 +35,18 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class CamelSourceTaskTest {
 
+    private static final String TIMER_URI = 
"timer:kafkaconnector?period=10&fixedRate=true&delay=0";
+
     @Test
     public void testSourcePolling() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2100L);
+        Thread.sleep(11L);
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(2, poll.size());
         assertEquals("mytopic", poll.get(0).topic());
@@ -62,9 +59,9 @@ public class CamelSourceTaskTest {
                 break;
             }
         }
-        assertTrue(containsHeader);
 
         camelSourceTask.stop();
+        assertTrue(containsHeader);
     }
 
     @Test
@@ -82,7 +79,7 @@ public class CamelSourceTaskTest {
         // first we test if we have a key in the message with body
         template.sendBodyAndHeader("direct:start", "awesome!", 
"CamelSpecialTestKey", 1234);
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -92,7 +89,7 @@ public class CamelSourceTaskTest {
         // second we test if we have no key under the header
         template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 
1234);
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -102,7 +99,7 @@ public class CamelSourceTaskTest {
         // third we test if we have the header but with null value
         template.sendBodyAndHeader("direct:start", "awesome!", 
"CamelSpecialTestKey", null);
 
-        Thread.sleep(100L);
+        Thread.sleep(10L);
 
         camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -126,7 +123,7 @@ public class CamelSourceTaskTest {
         // send first data
         template.sendBody("direct:start", "testing kafka connect");
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -138,7 +135,7 @@ public class CamelSourceTaskTest {
         // send second data
         template.sendBody("direct:start", true);
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -150,7 +147,7 @@ public class CamelSourceTaskTest {
         // second third data
         template.sendBody("direct:start", 1234L);
 
-        Thread.sleep(100L);
+        Thread.sleep(10L);
 
         poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -162,7 +159,7 @@ public class CamelSourceTaskTest {
         // third with null data
         template.sendBody("direct:start", null);
 
-        Thread.sleep(100L);
+        Thread.sleep(10L);
         poll = camelSourceTask.poll();
         assertNull(poll.get(0).key());
         assertNull(poll.get(0).keySchema());
@@ -175,50 +172,48 @@ public class CamelSourceTaskTest {
     @Test
     public void testSourcePollingTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
         props.put("camel.source.maxPollDuration", "1");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(3000L);
+        long sleepTime = 30L;
+        Thread.sleep(sleepTime);
         List<SourceRecord> poll;
         int retries = 3;
         do {
             poll = camelSourceTask.poll();
             if (poll == null) {
                 retries--;
-
                 if (retries == 0) {
                     fail("Exhausted the maximum retries and no record was 
returned");
                 }
-
-                Thread.sleep(3000L);
+                Thread.sleep(sleepTime);
             }
         } while (poll == null && retries > 0);
 
         assertTrue(poll.size() >= 1, "Received messages are: " + poll.size() + 
", expected between 1 and 2.");
         assertTrue(poll.size() <= 2, "Received messages are: " + poll.size() + 
", expected between 1 and 2.");
-
         camelSourceTask.stop();
     }
 
     @Test
     public void testSourcePollingMaxRecordNumber() throws InterruptedException 
{
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
         props.put("camel.source.maxBatchPollSize", "1");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2000L);
+        Thread.sleep(11L);
         List<SourceRecord> poll = camelSourceTask.poll();
-        assertEquals(1, poll.size());
-
         camelSourceTask.stop();
+
+        assertEquals(1, poll.size());
     }
 
     @Test
@@ -243,9 +238,9 @@ public class CamelSourceTaskTest {
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() throws 
JsonProcessingException, InterruptedException {
+    public void testUrlPrecedenceOnComponentProperty() throws 
InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"shouldNotBeUsed");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"endpointProperty", "shouldNotBeUsed");
@@ -254,7 +249,7 @@ public class CamelSourceTaskTest {
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2100L);
+        Thread.sleep(11L);
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(2, poll.size());
         assertEquals("mytopic", poll.get(0).topic());
@@ -267,13 +262,13 @@ public class CamelSourceTaskTest {
                 break;
             }
         }
-        assertTrue(containsHeader);
-
         camelSourceTask.stop();
+
+        assertTrue(containsHeader);
     }
 
     @Test
-    public void testSourcePollingUsingComponentProperty() throws 
JsonProcessingException, InterruptedException {
+    public void testSourcePollingUsingComponentProperty() throws 
InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
@@ -298,13 +293,14 @@ public class CamelSourceTaskTest {
         }
         assertTrue(containsHeader);
 
-        assertEquals(1, 
camelSourceTask.getCms().getEndpoints().stream().filter(e -> 
e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
+        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+            .filter(e -> 
e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
 
         camelSourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingUsingMultipleComponentProperties() throws 
JsonProcessingException, InterruptedException {
+    public void testSourcePollingUsingMultipleComponentProperties() throws 
InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
@@ -330,7 +326,8 @@ public class CamelSourceTaskTest {
         }
         assertTrue(containsHeader);
 
-        assertEquals(1, 
camelSourceTask.getCms().getEndpoints().stream().filter(e -> 
e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
+        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+            .filter(e -> 
e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
 
         camelSourceTask.stop();
     }
diff --git 
a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
 
b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
index aeb2da1..d1eef84 100644
--- 
a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
+++ 
b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
@@ -112,7 +112,7 @@ public abstract class AbstractCamelKafkaConnectorMojo 
extends AbstractMojo {
     public void execute() throws MojoExecutionException, MojoFailureException {
         configureResourceManager();
         if (!project.getArtifactId().equals(connectorsProjectName)) {
-            getLog().debug("Skipping porject " + project.getArtifactId() + " 
since it is not " + connectorsProjectName + " can be configured with 
<connectors-project-name> option.");
+            getLog().debug("Skipping project " + project.getArtifactId() + " 
since it is not " + connectorsProjectName + " can be configured with 
<connectors-project-name> option.");
             return;
         }
         try {

Reply via email to