This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch kamelets in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 251fde7e0130611f97862100c77e372723020429 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Mon Apr 26 01:38:42 2021 +0200 Related to #423 : caonverted source and sink to use camel-kamelets. --- core/pom.xml | 10 +- .../apache/camel/kafkaconnector/CamelSinkTask.java | 5 +- .../camel/kafkaconnector/CamelSourceTask.java | 5 +- .../utils/CamelKafkaConnectMain.java | 272 ++++++++++++--------- .../camel/kafkaconnector/CamelSourceTaskTest.java | 10 +- .../camel/kafkaconnector/DataFormatTest.java | 10 +- 6 files changed, 191 insertions(+), 121 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index f6c014f..3138163 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -53,12 +53,20 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-kafka</artifactId> + <artifactId>camel-kamelet</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core-languages</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-xml-jaxb</artifactId> + </dependency> <!-- Tools --> <dependency> diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 82c16d2..4e5a201 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CamelSinkTask extends SinkTask { + public static final String KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSink."; + public static final String KAFKA_RECORD_KEY_HEADER = "camel.kafka.connector.record.key"; public static final String HEADER_CAMEL_PREFIX = "CamelHeader."; public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty."; @@ -119,8 +121,9 @@ public class CamelSinkTask extends SinkTask { CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX); } + actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl); - cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl) + cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink") .withProperties(actualProps) .withUnmarshallDataFormat(unmarshaller) .withMarshallDataFormat(marshaller) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 00ce145..77ce636 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; public class CamelSourceTask extends SourceTask { + public static final String KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcSource."; public static final String HEADER_CAMEL_PREFIX = "CamelHeader."; public static final String PROPERTY_CAMEL_PREFIX = "CamelProperty."; @@ -145,8 +146,9 @@ public class CamelSourceTask extends SourceTask { config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); } + actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl); - cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl) + cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl) .withProperties(actualProps) .withUnmarshallDataFormat(unmarshaller) .withMarshallDataFormat(marshaller) @@ -171,6 +173,7 @@ public class CamelSourceTask extends SourceTask { consumer.start(); cms.start(); + LOG.info("CamelSourceTask connector task started"); } catch (Exception e) { throw new ConnectException("Failed to create and start Camel context", e); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index d031b20..6e7dbdf 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -26,10 +26,16 @@ import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.DefaultErrorHandlerBuilder; +import org.apache.camel.builder.ErrorHandlerBuilderRef; +import org.apache.camel.builder.NoErrorHandlerBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.kafkaconnector.CamelConnectorConfig; +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.CamelSourceTask; import org.apache.camel.main.SimpleMain; -import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.RouteTemplateDefinition; import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; @@ -40,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CamelKafkaConnectMain extends SimpleMain { - public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat."; private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class); protected volatile ConsumerTemplate consumerTemplate; @@ -140,67 +145,67 @@ public class CamelKafkaConnectMain extends SimpleMain { this.aggregationTimeout = aggregationTimeout; return this; } - + public Builder withErrorHandler(String errorHandler) { this.errorHandler = errorHandler; return this; } - + public Builder withMaxRedeliveries(int maxRedeliveries) { this.maxRedeliveries = maxRedeliveries; return this; } - + public Builder withRedeliveryDelay(long redeliveryDelay) { this.redeliveryDelay = redeliveryDelay; return this; } - + public Builder withIdempotencyEnabled(boolean idempotencyEnabled) { this.idempotencyEnabled = idempotencyEnabled; return this; } - + public Builder withExpressionType(String expressionType) { this.expressionType = expressionType; return this; } - + public Builder withExpressionHeader(String expressionHeader) { this.expressionHeader = expressionHeader; return this; } - + public Builder withMemoryDimension(int memoryDimension) { this.memoryDimension = memoryDimension; return this; } - + public Builder withIdempotentRepositoryType(String idempotentRepositoryType) { this.idempotentRepositoryType = idempotentRepositoryType; return this; } - + public Builder withIdempotentRepositoryTopicName(String idempotentRepositoryTopicName) { this.idempotentRepositoryTopicName = idempotentRepositoryTopicName; return this; } - + public Builder withIdempotentRepositoryKafkaServers(String idempotentRepositoryKafkaServers) { this.idempotentRepositoryKafkaServers = idempotentRepositoryKafkaServers; return this; } - + public Builder withIdempotentRepositoryKafkaMaxCacheSize(int idempotentRepositoryKafkaMaxCacheSize) { this.idempotentRepositoryKafkaMaxCacheSize = idempotentRepositoryKafkaMaxCacheSize; return this; } - + public Builder withIdempotentRepositoryKafkaPollDuration(int idempotentRepositoryKafkaPollDuration) { this.idempotentRepositoryKafkaPollDuration = idempotentRepositoryKafkaPollDuration; return this; } - + public Builder withHeadersExcludePattern(String headersExcludePattern) { this.headersExcludePattern = headersExcludePattern; return this; @@ -214,21 +219,51 @@ public class CamelKafkaConnectMain extends SimpleMain { return entry.getKey() + "=" + entry.getValue(); } - public CamelKafkaConnectMain build(CamelContext camelContext) { CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); camelMain.configure().setAutoConfigurationLogSummary(false); + //TODO: make it configurable + camelMain.configure().setDumpRoutes(true); Properties camelProperties = new Properties(); camelProperties.putAll(props); - List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList()); + //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved +// //dataformats +// if (!ObjectHelper.isEmpty(marshallDataFormat)) { +// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); +// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); +// } +// if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { +// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); +// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); +// } - LOG.info("Setting initial properties in Camel context: [{}]", filteredProps); - camelMain.setInitialProperties(camelProperties); - - // Instantianting the idempotent Repository here and inject it in registry to be referenced + //aggregator + if (!ObjectHelper.isEmpty(aggregationSize)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize)); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize)); + } + if (!ObjectHelper.isEmpty(aggregationTimeout)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout)); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout)); + } + + //idempotency if (idempotencyEnabled) { + switch (expressionType) { + case "body": + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}"); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}"); + break; + case "header": + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}"); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}"); + break; + default: + break; + } + // Instantiating the idempotent Repository here and inject it in registry to be referenced IdempotentRepository idempotentRepo = null; switch (idempotentRepositoryType) { case "memory": @@ -240,110 +275,123 @@ public class CamelKafkaConnectMain extends SimpleMain { default: break; } - camelMain.getCamelContext().getRegistry().bind("idempotentRepository", idempotentRepo); + camelMain.getCamelContext().getRegistry().bind("ckcIdempotentRepository", idempotentRepo); + } + + //remove headers + if (!ObjectHelper.isEmpty(headersExcludePattern)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern); + } + + List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList()); + LOG.info("Setting initial properties in Camel context: [{}]", filteredProps); + camelMain.setInitialProperties(camelProperties); + + //error handler + camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder()); + if (errorHandler != null) { + switch (errorHandler) { + case "no": + camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder()); + break; + case "default": + camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay)); + break; + default: + break; + } } - //creating the actual route camelMain.configure().addRoutesBuilder(new RouteBuilder() { public void configure() { - //from - RouteDefinition rd = from(from); - LOG.info("Creating Camel route from({})", from); - - if (!ObjectHelper.isEmpty(errorHandler)) { - switch (errorHandler) { - case "no": - rd.errorHandler(noErrorHandler()); - break; - case "default": - rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay)); - break; - default: - break; - } + + //creating source template + RouteTemplateDefinition rtdSource = routeTemplate("ckcSource") + .templateParameter("fromUrl") + .templateParameter("errorHandler", "ckcErrorHandler") + //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved +// .templateParameter("marshall", "dummyDataformat") +// .templateParameter("unmarshall", "dummyDataformat") + + //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? + .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) + .templateParameter("aggregationSize", "1") + .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE)) + + .templateParameter("idempotentExpression", "dummyExpression") + .templateParameter("idempotentRepository", "ckcIdempotentRepository") + .templateParameter("headersExcludePattern", "(?!)"); + + + ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}") + .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); + if (!ObjectHelper.isEmpty(marshallDataFormat)) { + rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat); + } + if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { + rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat); } - //dataformats + if (getContext().getRegistry().lookupByName("aggregate") != null) { + AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class); + rdInTemplateSource = rdInTemplateSource.aggregate(s) + .constant(true) + .completionSize("{{aggregationSize}}") + .completionTimeout("{{aggregationTimeout}}"); + } + + if (idempotencyEnabled) { + rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}"); + } + + rdInTemplateSource.removeHeaders("{{headersExcludePattern}}") + .to("kamelet:sink"); + + //creating sink template + RouteTemplateDefinition rtdSink = routeTemplate("ckcSink") + .templateParameter("toUrl") + .templateParameter("errorHandler", "ckcErrorHandler") + //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved +// .templateParameter("marshall", "dummyDataformat") +// .templateParameter("unmarshall", "dummyDataformat") + + //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? + .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) + .templateParameter("aggregationSize", "1") + .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE)) + + .templateParameter("idempotentExpression", "dummyExpression") + .templateParameter("idempotentRepository", "ckcIdempotentRepository") + .templateParameter("headersExcludePattern", "(?!)"); + + + ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source") + .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); if (!ObjectHelper.isEmpty(marshallDataFormat)) { - LOG.info(".marshal({})", marshallDataFormat); - rd.marshal(marshallDataFormat); + rdInTemplateSink = rdInTemplateSink.marshal(marshallDataFormat); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - LOG.info(".unmarshal({})", unmarshallDataFormat); - rd.unmarshal(unmarshallDataFormat); + rdInTemplateSink = rdInTemplateSink.unmarshal(unmarshallDataFormat); } + if (getContext().getRegistry().lookupByName("aggregate") != null) { - //aggregation AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class); - if (idempotencyEnabled) { - switch (expressionType) { - case "body": - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(body(), + " - + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); - LOG.info(".to({})", to); - if (ObjectHelper.isEmpty(headersExcludePattern)) { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); - } else { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) - .idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); - } - break; - case "header": - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({}).idempotentConsumer(header(expressionHeader), + " - + "MemoryIdempotentRepository.memoryIdempotentRepository({}))", s, aggregationSize, aggregationTimeout, memoryDimension); - LOG.info(".to({})", to); - if (ObjectHelper.isEmpty(headersExcludePattern)) { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) - .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); - } else { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) - .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); - } - break; - default: - break; - } - } else { - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); - LOG.info(".to({})", to); - if (ObjectHelper.isEmpty(headersExcludePattern)) { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); - } else { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to); - } - } - } else { - if (idempotencyEnabled) { - switch (expressionType) { - case "body": - LOG.info("idempotentConsumer(body(), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - if (ObjectHelper.isEmpty(headersExcludePattern)) { - rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); - } else { - rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); - } - break; - case "header": - LOG.info("idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository({})).to({})", memoryDimension, to); - if (ObjectHelper.isEmpty(headersExcludePattern)) { - rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); - } else { - rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); - } - break; - default: - break; - } - } else { - //to - LOG.info(".to({})", to); - if (ObjectHelper.isEmpty(headersExcludePattern)) { - rd.toD(to); - } else { - rd.removeHeaders(headersExcludePattern).toD(to); - } - } + rdInTemplateSink = rdInTemplateSink.aggregate(s) + .constant(true) + .completionSize("{{aggregationSize}}") + .completionTimeout("{{aggregationTimeout}}"); } + + if (idempotencyEnabled) { + rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}"); + } + + rdInTemplateSink.removeHeaders("{{headersExcludePattern}}") + .to("{{toUrl}}"); + + //creating the actual route + from(from).toD(to); } }); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 5c99ad0..b1271ac 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -45,7 +45,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class CamelSourceTaskTest { - private static final String DIRECT_URI = "direct:start"; private static final String TOPIC_NAME = "my-topic"; @@ -225,7 +224,7 @@ public class CamelSourceTaskTest { } @Test - public void testUrlPrecedenceOnComponentProperty() { + public void testUrlPrecedenceOnComponentProperty() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2"); @@ -236,7 +235,8 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size()); +// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size()); + sourceTask.getCms().getCamelContext().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("timer")) @@ -261,10 +261,10 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size()); +// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size()); sourceTask.getCms().getCamelContext().getEndpoints().stream() - .filter(e -> e.getEndpointUri().startsWith("direct")) + .filter(e -> e.getEndpointUri().startsWith("seda")) .forEach(e -> { assertTrue(e.getEndpointUri().contains("end")); assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10")); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index 6715843..c3d26a4 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -16,7 +16,9 @@ */ package org.apache.camel.kafkaconnector; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.camel.component.hl7.HL7DataFormat; @@ -24,6 +26,7 @@ import org.apache.camel.component.syslog.SyslogDataFormat; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -66,8 +69,13 @@ public class DataFormatTest { props.put("camel.sink.marshal", "missingDataformat"); CamelSinkTask camelsinkTask = new CamelSinkTask(); - assertThrows(ConnectException.class, () -> camelsinkTask.start(props)); + camelsinkTask.start(props); + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42); + records.add(record); + assertThrows(ConnectException.class, () -> camelsinkTask.put(records)); // No need to check the stop method. The error is already thrown/caught during startup. + camelsinkTask.stop(); } @Test