This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch kamelets in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8bbd57027cc708e73a9d5b7f2007f66b9ee1f201 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Mon Jul 26 11:34:35 2021 +0200 Related to #423 modularized kamelets and composed them to better autogenerate connectors from kamelets catalog --- .../utils/CamelKafkaConnectMain.java | 172 ++++++++++----------- .../camel/kafkaconnector/DataFormatTest.java | 9 +- 2 files changed, 80 insertions(+), 101 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 0871307..2e8d3a8 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; -import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; import org.apache.camel.ProducerTemplate; @@ -31,11 +30,8 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef; import org.apache.camel.builder.NoErrorHandlerBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.kafkaconnector.CamelConnectorConfig; -import org.apache.camel.kafkaconnector.CamelSinkTask; -import org.apache.camel.kafkaconnector.CamelSourceTask; import org.apache.camel.main.SimpleMain; import org.apache.camel.model.ProcessorDefinition; -import org.apache.camel.model.RouteTemplateDefinition; import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository; @@ -46,6 +42,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CamelKafkaConnectMain extends SimpleMain { + public static final String KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcMarshal."; + public static final String KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcUnMarshal."; + public static final String KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcAggregator."; + public static final String KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcIdempotent."; + public static final String KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX = "camel.kamelet.ckcRemoveHeader."; + private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class); protected volatile ConsumerTemplate consumerTemplate; @@ -228,36 +230,45 @@ public class CamelKafkaConnectMain extends SimpleMain { Properties camelProperties = new Properties(); camelProperties.putAll(props); + //error handler + camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder()); + if (errorHandler != null) { + switch (errorHandler) { + case "no": + camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder()); + break; + case "default": + camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay)); + break; + default: + break; + } + } + //dataformats if (!ObjectHelper.isEmpty(marshallDataFormat)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); + camelProperties.put(KAMELET_MARSHAL_TEMPLATE_PARAMETERS_PREFIX + "marshal", marshallDataFormat); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); + camelProperties.put(KAMELET_UNMARSHAL_TEMPLATE_PARAMETERS_PREFIX + "unmarshal", unmarshallDataFormat); } //aggregator if (!ObjectHelper.isEmpty(aggregationSize)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize)); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize)); + camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationSize", String.valueOf(aggregationSize)); } if (!ObjectHelper.isEmpty(aggregationTimeout)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout)); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout)); + camelProperties.put(KAMELET_AGGREGATORL_TEMPLATE_PARAMETERS_PREFIX + "aggregationTimeout", String.valueOf(aggregationTimeout)); } //idempotency if (idempotencyEnabled) { switch (expressionType) { case "body": - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}"); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}"); + camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${body}"); break; case "header": - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}"); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}"); + camelProperties.put(KAMELET_IDEMPOTENT_TEMPLATE_PARAMETERS_PREFIX + "idempotentExpression", "${headers." + expressionHeader + "}"); break; default: break; @@ -279,117 +290,92 @@ public class CamelKafkaConnectMain extends SimpleMain { //remove headers if (!ObjectHelper.isEmpty(headersExcludePattern)) { - camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern); - camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern); + camelProperties.put(KAMELET_REMOVEHEADER_TEMPLATE_PARAMETERS_PREFIX + "headersExcludePattern", headersExcludePattern); } + // log filtered properties and set initial camel properties List<String> filteredProps = camelProperties.entrySet().stream().map(this::filterSensitive).collect(Collectors.toList()); LOG.info("Setting initial properties in Camel context: [{}]", filteredProps); camelMain.setInitialProperties(camelProperties); - //error handler - camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder()); - if (errorHandler != null) { - switch (errorHandler) { - case "no": - camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new NoErrorHandlerBuilder()); - break; - case "default": - camelMain.getCamelContext().getRegistry().bind("ckcErrorHandler", new DefaultErrorHandlerBuilder().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay)); - break; - default: - break; - } - } - camelMain.configure().addRoutesBuilder(new RouteBuilder() { public void configure() { - //creating source template - RouteTemplateDefinition rtdSource = routeTemplate("ckcSource") - .templateParameter("fromUrl") - .templateParameter("errorHandler", "ckcErrorHandler") - + //create marshal template + routeTemplate("ckcMarshal") .templateParameter("marshal", "dummyDataformat") + .from("kamelet:source") + .marshal("{{marshal}}") + .to("kamelet:sink"); + + //create unmarshal template + routeTemplate("ckcUnMarshal") .templateParameter("unmarshal", "dummyDataformat") + .from("kamelet:source") + .marshal("{{unmarshal}}") + .to("kamelet:sink"); - //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? + //create aggregator template + routeTemplate("ckcAggregator") + //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME to ckcAggregationStrategy? .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) .templateParameter("aggregationSize", "1") .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE)) + .from("kamelet:source") + .aggregate(constant(true)) + .aggregationStrategyRef("{{aggregationStrategy}}") + .completionSize("{{aggregationSize}}") + .completionTimeout("{{aggregationTimeout}}") + .to("kamelet:sink") + .end(); + //create idempotent template + routeTemplate("ckcIdempotent") .templateParameter("idempotentExpression", "dummyExpression") .templateParameter("idempotentRepository", "ckcIdempotentRepository") - .templateParameter("headersExcludePattern", "(?!)"); - - - ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}") - .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); - if (!ObjectHelper.isEmpty(marshallDataFormat)) { - rdInTemplateSource = rdInTemplateSource.marshal("{{marshal}}"); - } - if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshal}}"); - } - - if (getContext().getRegistry().lookupByName("aggregate") != null) { - AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class); - rdInTemplateSource = rdInTemplateSource.aggregate(s) - .constant(true) - .completionSize("{{aggregationSize}}") - .completionTimeout("{{aggregationTimeout}}"); - } + .from("kamelet:source") + .idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}") + .to("kamelet:sink"); - if (idempotencyEnabled) { - rdInTemplateSource = rdInTemplateSource.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}"); - } + //create removeHeader template + routeTemplate("ckcRemoveHeader") + .templateParameter("headersExcludePattern", "(?!)") + .from("kamelet:source") + .removeHeaders("{{headersExcludePattern}}") + .to("kamelet:sink"); - rdInTemplateSource.removeHeaders("{{headersExcludePattern}}") + //creating source template + routeTemplate("ckcSource") + .templateParameter("fromUrl") + .templateParameter("errorHandler", "ckcErrorHandler") + .from("{{fromUrl}}") + .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")) .to("kamelet:sink"); //creating sink template - RouteTemplateDefinition rtdSink = routeTemplate("ckcSink") + routeTemplate("ckcSink") .templateParameter("toUrl") .templateParameter("errorHandler", "ckcErrorHandler") - .templateParameter("marshal", "dummyDataformat") - .templateParameter("unmarshal", "dummyDataformat") - - //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? - .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) - .templateParameter("aggregationSize", "1") - .templateParameter("aggregationTimeout", String.valueOf(Long.MAX_VALUE)) - - .templateParameter("idempotentExpression", "dummyExpression") - .templateParameter("idempotentRepository", "ckcIdempotentRepository") - .templateParameter("headersExcludePattern", "(?!)"); - + .from("kamelet:source") + .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")) + .to("{{toUrl}}"); - ProcessorDefinition<?> rdInTemplateSink = rtdSink.from("kamelet:source") - .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); + //creating the actual route + ProcessorDefinition<?> rd = from(from); if (!ObjectHelper.isEmpty(marshallDataFormat)) { - rdInTemplateSink = rdInTemplateSink.marshal("{{marshal}}"); + rd = rd.kamelet("ckcMarshal"); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - rdInTemplateSink = rdInTemplateSink.unmarshal("{{unmarshal}}"); + rd = rd.kamelet("ckcUnMarshal"); } - if (getContext().getRegistry().lookupByName("aggregate") != null) { - AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class); - rdInTemplateSink = rdInTemplateSink.aggregate(s) - .constant(true) - .completionSize("{{aggregationSize}}") - .completionTimeout("{{aggregationTimeout}}"); + rd = rd.kamelet("ckcAggregator"); } - if (idempotencyEnabled) { - rdInTemplateSink = rdInTemplateSink.idempotentConsumer(simple("{{idempotentExpression}}")).messageIdRepositoryRef("{{idempotentRepository}}"); + rd = rd.kamelet("ckcIdempotent"); } - - rdInTemplateSink.removeHeaders("{{headersExcludePattern}}") - .to("{{toUrl}}"); - - //creating the actual route - from(from).toD(to); + rd = rd.kamelet("ckcRemoveHeader"); + rd.toD(to); } }); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index c3d26a4..36a886c 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -16,9 +16,7 @@ */ package org.apache.camel.kafkaconnector; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.camel.component.hl7.HL7DataFormat; @@ -26,7 +24,6 @@ import org.apache.camel.component.syslog.SyslogDataFormat; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -69,11 +66,7 @@ public class DataFormatTest { props.put("camel.sink.marshal", "missingDataformat"); CamelSinkTask camelsinkTask = new CamelSinkTask(); - camelsinkTask.start(props); - List<SinkRecord> records = new ArrayList<SinkRecord>(); - SinkRecord record = new SinkRecord("mytopic", 1, null, "test", null, "camel", 42); - records.add(record); - assertThrows(ConnectException.class, () -> camelsinkTask.put(records)); + assertThrows(ConnectException.class, () -> camelsinkTask.start(props)); // No need to check the stop method. The error is already thrown/caught during startup. camelsinkTask.stop(); }