This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new dc5b5fd Code cleanup new 2095b4b Merge pull request #184 from fvaleri/cleanup dc5b5fd is described below commit dc5b5fdb189c8402535d41695320b508f870e9f6 Author: Federico Valeri <fvaleri@localhost> AuthorDate: Sun May 3 17:59:51 2020 +0200 Code cleanup --- .../camel/kafkaconnector/CamelSinkConnector.java | 6 +- .../kafkaconnector/CamelSinkConnectorConfig.java | 13 +++-- .../apache/camel/kafkaconnector/CamelSinkTask.java | 18 +++--- .../kafkaconnector/CamelSourceConnectorConfig.java | 26 +++++---- .../camel/kafkaconnector/CamelSourceTask.java | 5 +- .../kafkaconnector/utils/CamelMainSupport.java | 26 ++++----- .../camel/kafkaconnector/CamelSourceTaskTest.java | 65 +++++++++++----------- .../maven/AbstractCamelKafkaConnectorMojo.java | 2 +- 8 files changed, 80 insertions(+), 81 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java index 7cc47f0..092e513 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CamelSinkConnector extends SinkConnector { - private static Logger log = LoggerFactory.getLogger(CamelSinkConnector.class); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkConnector.class); private Map<String, String> configProps; @@ -38,7 +38,7 @@ public class CamelSinkConnector extends SinkConnector { @Override public void start(Map<String, String> configProps) { - log.info("Connector config keys: {}", String.join(", ", configProps.keySet())); + LOG.info("Connector config keys: {}", String.join(", ", configProps.keySet())); this.configProps = configProps; } @@ -49,7 +49,7 @@ public class CamelSinkConnector extends SinkConnector { @Override public List<Map<String, String>> taskConfigs(int maxTasks) { - log.info("Setting task configurations for {} workers.", maxTasks); + LOG.info("Setting task configurations for {} workers.", maxTasks); final List<Map<String, String>> configs = new ArrayList<>(maxTasks); for (int i = 0; i < maxTasks; ++i) { configs.add(configProps); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index d59d342..e11e84d 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -38,18 +38,21 @@ public class CamelSinkConnectorConfig extends AbstractConfig { public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF + " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored."; + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) + .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC) + .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC); + public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); } public CamelSinkConnectorConfig(Map<String, String> parsedConfig) { - this(conf(), parsedConfig); + this(CONFIG_DEF, parsedConfig); } public static ConfigDef conf() { - return new ConfigDef() - .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) - .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC) - .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC); + return CONFIG_DEF; } + } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 9785215..baa6665 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -19,11 +19,9 @@ package org.apache.camel.kafkaconnector; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; @@ -44,7 +42,7 @@ public class CamelSinkTask extends SinkTask { private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX = "camel.sink.endpoint."; private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX = "camel.sink.path."; - private static Logger log = LoggerFactory.getLogger(CamelSinkTask.class); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class); private static final String LOCAL_URL = "direct:start"; private static final String HEADER_CAMEL_PREFIX = "CamelHeader"; @@ -62,7 +60,7 @@ public class CamelSinkTask extends SinkTask { @Override public void start(Map<String, String> props) { try { - log.info("Starting CamelSinkTask connector task"); + LOG.info("Starting CamelSinkTask connector task"); Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props); config = getCamelSinkConnectorConfig(actualProps); @@ -78,7 +76,7 @@ public class CamelSinkTask extends SinkTask { producer = cms.createProducerTemplate(); cms.start(); - log.info("CamelSinkTask connector task started"); + LOG.info("CamelSinkTask connector task started"); } catch (Exception e) { throw new ConnectException("Failed to create and start Camel context", e); } @@ -116,20 +114,20 @@ public class CamelSinkTask extends SinkTask { } exchange.getMessage().setHeaders(headers); exchange.getMessage().setBody(record.value()); - log.debug("Sending {} to {}", exchange, LOCAL_URL); + LOG.debug("Sending {} to {}", exchange, LOCAL_URL); producer.send(LOCAL_URL, exchange); } } @Override public void stop() { + LOG.info("Stopping CamelSinkTask connector task"); try { - log.info("Stopping CamelSinkTask connector task"); cms.stop(); } catch (Exception e) { throw new ConnectException("Failed to stop Camel context", e); } finally { - log.info("CamelSinkTask connector task stopped"); + LOG.info("CamelSinkTask connector task stopped"); } } @@ -157,7 +155,7 @@ public class CamelSinkTask extends SinkTask { map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) { map.put(singleHeader.key(), (List<?>)singleHeader.value()); - } + } } private void addProperty(Exchange exchange, Header singleHeader) { @@ -184,7 +182,7 @@ public class CamelSinkTask extends SinkTask { exchange.getProperties().put(singleHeader.key(), (Map<?, ?>)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) { exchange.getProperties().put(singleHeader.key(), (List<?>)singleHeader.value()); - } + } } public CamelMainSupport getCms() { diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 930ae8b..3de49dd 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -67,25 +67,27 @@ public class CamelSourceConnectorConfig extends AbstractConfig { public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name of a camel message header containing an unique key that can be used as a Kafka message key." + " If this is not specified, then the Kafka message will not have a key."; + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC) + .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC) + .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC) + .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC) + .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC) + .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC) + .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC) + .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC) + .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC); + public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); } public CamelSourceConnectorConfig(Map<String, String> parsedConfig) { - this(conf(), parsedConfig); + this(CONFIG_DEF, parsedConfig); } public static ConfigDef conf() { - return new ConfigDef() - .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC) - .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC) - .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC) - .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC) - .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC) - .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC) - .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC) - .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC) - .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC) - .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC); + return CONFIG_DEF; } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 5025296..83ea1b1 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -24,7 +24,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashSet; import java.util.List; import java.util.Map; @@ -91,7 +90,7 @@ public class CamelSourceTask extends SourceTask { Endpoint endpoint = cms.getEndpoint(localUrl); consumer = endpoint.createPollingConsumer(); consumer.start(); - + cms.start(); LOG.info("CamelSourceTask connector task started"); } catch (Exception e) { @@ -143,7 +142,7 @@ public class CamelSourceTask extends SourceTask { } if (records.isEmpty()) { - return null; + return Collections.EMPTY_LIST; } else { return records; } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java index ddfd1e2..91735f2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; public class CamelMainSupport { public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat."; - private static Logger log = LoggerFactory.getLogger(CamelMainSupport.class); + private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class); private Main camelMain; private CamelContext camel; @@ -83,7 +83,7 @@ public class CamelMainSupport { Properties camelProperties = new OrderedProperties(); camelProperties.putAll(orderedProps); - log.info("Setting initial properties in Camel context: [{}]", camelProperties); + LOG.info("Setting initial properties in Camel context: [{}]", camelProperties); this.camel.getPropertiesComponent().setInitialProperties(camelProperties); //creating the actual route @@ -93,15 +93,15 @@ public class CamelMainSupport { if (marshal != null && unmarshal != null) { throw new UnsupportedOperationException("Uses of both marshal (i.e. " + marshal + ") and unmarshal (i.e. " + unmarshal + ") is not supported"); } else if (marshal != null) { - log.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl); + LOG.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl); camel.getRegistry().bind(marshal, lookupAndInstantiateDataformat(marshal)); rd.marshal().custom(marshal); } else if (unmarshal != null) { - log.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl); + LOG.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl); camel.getRegistry().bind(unmarshal, lookupAndInstantiateDataformat(unmarshal)); rd.unmarshal().custom(unmarshal); } else { - log.info("Creating Camel route from({}).to({})", fromUrl, toUrl); + LOG.info("Creating Camel route from({}).to({})", fromUrl, toUrl); } rd.to(toUrl); } @@ -109,27 +109,27 @@ public class CamelMainSupport { } public void start() throws Exception { - log.info("Starting CamelContext"); + LOG.info("Starting CamelContext"); CamelContextStarter starter = new CamelContextStarter(); exService.execute(starter); startFinishedSignal.await(); if (starter.hasException()) { - log.info("CamelContext failed to start", starter.getException()); + LOG.info("CamelContext failed to start", starter.getException()); throw starter.getException(); } - log.info("CamelContext started"); + LOG.info("CamelContext started"); } public void stop() { - log.info("Stopping CamelContext"); + LOG.info("Stopping CamelContext"); camelMain.stop(); exService.shutdown(); - log.info("CamelContext stopped"); + LOG.info("CamelContext stopped"); } public ProducerTemplate createProducerTemplate() { @@ -190,7 +190,7 @@ public class CamelMainSupport { @Override public void afterStart(BaseMainSupport main) { - log.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called"); + LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called"); startFinishedSignal.countDown(); } @@ -217,10 +217,10 @@ public class CamelMainSupport { try { camelMain.run(); } catch (Exception e) { - log.error("An exception has occurred before CamelContext startup has finished", e); + LOG.error("An exception has occurred before CamelContext startup has finished", e); startException = e; if (startFinishedSignal.getCount() > 0) { - log.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception"); + LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception"); startFinishedSignal.countDown(); } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 934b78d..89205ae 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -16,20 +16,15 @@ */ package org.apache.camel.kafkaconnector; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.camel.ConsumerTemplate; -import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; -import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; @@ -40,16 +35,18 @@ import static org.junit.jupiter.api.Assertions.fail; public class CamelSourceTaskTest { + private static final String TIMER_URI = "timer:kafkaconnector?period=10&fixedRate=true&delay=0"; + @Test public void testSourcePolling() throws InterruptedException { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector"); + props.put("camel.source.url", TIMER_URI); props.put("camel.source.kafka.topic", "mytopic"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(2100L); + Thread.sleep(11L); List<SourceRecord> poll = camelSourceTask.poll(); assertEquals(2, poll.size()); assertEquals("mytopic", poll.get(0).topic()); @@ -62,9 +59,9 @@ public class CamelSourceTaskTest { break; } } - assertTrue(containsHeader); camelSourceTask.stop(); + assertTrue(containsHeader); } @Test @@ -82,7 +79,7 @@ public class CamelSourceTaskTest { // first we test if we have a key in the message with body template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234); - Thread.sleep(100L); + Thread.sleep(11L); List<SourceRecord> poll = camelSourceTask.poll(); assertEquals(1, poll.size()); @@ -92,7 +89,7 @@ public class CamelSourceTaskTest { // second we test if we have no key under the header template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234); - Thread.sleep(100L); + Thread.sleep(11L); poll = camelSourceTask.poll(); assertEquals(1, poll.size()); @@ -102,7 +99,7 @@ public class CamelSourceTaskTest { // third we test if we have the header but with null value template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null); - Thread.sleep(100L); + Thread.sleep(10L); camelSourceTask.poll(); assertEquals(1, poll.size()); @@ -126,7 +123,7 @@ public class CamelSourceTaskTest { // send first data template.sendBody("direct:start", "testing kafka connect"); - Thread.sleep(100L); + Thread.sleep(11L); List<SourceRecord> poll = camelSourceTask.poll(); assertEquals(1, poll.size()); @@ -138,7 +135,7 @@ public class CamelSourceTaskTest { // send second data template.sendBody("direct:start", true); - Thread.sleep(100L); + Thread.sleep(11L); poll = camelSourceTask.poll(); assertEquals(1, poll.size()); @@ -150,7 +147,7 @@ public class CamelSourceTaskTest { // second third data template.sendBody("direct:start", 1234L); - Thread.sleep(100L); + Thread.sleep(10L); poll = camelSourceTask.poll(); assertEquals(1, poll.size()); @@ -162,7 +159,7 @@ public class CamelSourceTaskTest { // third with null data template.sendBody("direct:start", null); - Thread.sleep(100L); + Thread.sleep(10L); poll = camelSourceTask.poll(); assertNull(poll.get(0).key()); assertNull(poll.get(0).keySchema()); @@ -175,50 +172,48 @@ public class CamelSourceTaskTest { @Test public void testSourcePollingTimeout() throws InterruptedException { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector"); + props.put("camel.source.url", TIMER_URI); props.put("camel.source.kafka.topic", "mytopic"); props.put("camel.source.maxPollDuration", "1"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(3000L); + long sleepTime = 30L; + Thread.sleep(sleepTime); List<SourceRecord> poll; int retries = 3; do { poll = camelSourceTask.poll(); if (poll == null) { retries--; - if (retries == 0) { fail("Exhausted the maximum retries and no record was returned"); } - - Thread.sleep(3000L); + Thread.sleep(sleepTime); } } while (poll == null && retries > 0); assertTrue(poll.size() >= 1, "Received messages are: " + poll.size() + ", expected between 1 and 2."); assertTrue(poll.size() <= 2, "Received messages are: " + poll.size() + ", expected between 1 and 2."); - camelSourceTask.stop(); } @Test public void testSourcePollingMaxRecordNumber() throws InterruptedException { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector"); + props.put("camel.source.url", TIMER_URI); props.put("camel.source.kafka.topic", "mytopic"); props.put("camel.source.maxBatchPollSize", "1"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(2000L); + Thread.sleep(11L); List<SourceRecord> poll = camelSourceTask.poll(); - assertEquals(1, poll.size()); - camelSourceTask.stop(); + + assertEquals(1, poll.size()); } @Test @@ -243,9 +238,9 @@ public class CamelSourceTaskTest { } @Test - public void testUrlPrecedenceOnComponentProperty() throws JsonProcessingException, InterruptedException { + public void testUrlPrecedenceOnComponentProperty() throws InterruptedException { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector"); + props.put("camel.source.url", TIMER_URI); props.put("camel.source.kafka.topic", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "shouldNotBeUsed"); props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed"); @@ -254,7 +249,7 @@ public class CamelSourceTaskTest { CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(2100L); + Thread.sleep(11L); List<SourceRecord> poll = camelSourceTask.poll(); assertEquals(2, poll.size()); assertEquals("mytopic", poll.get(0).topic()); @@ -267,13 +262,13 @@ public class CamelSourceTaskTest { break; } } - assertTrue(containsHeader); - camelSourceTask.stop(); + + assertTrue(containsHeader); } @Test - public void testSourcePollingUsingComponentProperty() throws JsonProcessingException, InterruptedException { + public void testSourcePollingUsingComponentProperty() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.kafka.topic", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); @@ -298,13 +293,14 @@ public class CamelSourceTaskTest { } assertTrue(containsHeader); - assertEquals(1, camelSourceTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count()); + assertEquals(1, camelSourceTask.getCms().getEndpoints().stream() + .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count()); camelSourceTask.stop(); } @Test - public void testSourcePollingUsingMultipleComponentProperties() throws JsonProcessingException, InterruptedException { + public void testSourcePollingUsingMultipleComponentProperties() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.kafka.topic", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); @@ -330,7 +326,8 @@ public class CamelSourceTaskTest { } assertTrue(containsHeader); - assertEquals(1, camelSourceTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count()); + assertEquals(1, camelSourceTask.getCms().getEndpoints().stream() + .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count()); camelSourceTask.stop(); } diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java index aeb2da1..d1eef84 100644 --- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java +++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java @@ -112,7 +112,7 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo { public void execute() throws MojoExecutionException, MojoFailureException { configureResourceManager(); if (!project.getArtifactId().equals(connectorsProjectName)) { - getLog().debug("Skipping porject " + project.getArtifactId() + " since it is not " + connectorsProjectName + " can be configured with <connectors-project-name> option."); + getLog().debug("Skipping project " + project.getArtifactId() + " since it is not " + connectorsProjectName + " can be configured with <connectors-project-name> option."); return; } try {