This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch kamelets in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 112e13f17bec3dbe9d0814cc76b15c714de4a7d9 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Sat May 15 07:59:12 2021 +0200 Related to #423 resolved a problem with marshal/unmarshal after fixin https://issues.apache.org/jira/browse/CAMEL-16551 for sources --- .../utils/CamelKafkaConnectMain.java | 29 +++++++++++----------- .../camel/kafkaconnector/CamelSourceTaskTest.java | 5 ---- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 6e7dbdf..036375b 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -228,16 +228,15 @@ public class CamelKafkaConnectMain extends SimpleMain { Properties camelProperties = new Properties(); camelProperties.putAll(props); - //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved -// //dataformats -// if (!ObjectHelper.isEmpty(marshallDataFormat)) { -// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); -// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); -// } -// if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { -// camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); -// camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); -// } + //dataformats + if (!ObjectHelper.isEmpty(marshallDataFormat)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "marshall", marshallDataFormat); + } + if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { + camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); + camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "unmarshall", unmarshallDataFormat); + } //aggregator if (!ObjectHelper.isEmpty(aggregationSize)) { @@ -310,9 +309,9 @@ public class CamelKafkaConnectMain extends SimpleMain { RouteTemplateDefinition rtdSource = routeTemplate("ckcSource") .templateParameter("fromUrl") .templateParameter("errorHandler", "ckcErrorHandler") - //TODO: enable or delete these parameters once https://issues.apache.org/jira/browse/CAMEL-16551 is resolved -// .templateParameter("marshall", "dummyDataformat") -// .templateParameter("unmarshall", "dummyDataformat") + + .templateParameter("marshall", "dummyDataformat") + .templateParameter("unmarshall", "dummyDataformat") //TODO: change CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy? .templateParameter("aggregationStrategy", CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME) @@ -327,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain { ProcessorDefinition<?> rdInTemplateSource = rtdSource.from("{{fromUrl}}") .errorHandler(new ErrorHandlerBuilderRef("{{errorHandler}}")); if (!ObjectHelper.isEmpty(marshallDataFormat)) { - rdInTemplateSource = rdInTemplateSource.marshal(marshallDataFormat); + rdInTemplateSource = rdInTemplateSource.marshal("{{marshall}}"); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - rdInTemplateSource = rdInTemplateSource.unmarshal(unmarshallDataFormat); + rdInTemplateSource = rdInTemplateSource.unmarshal("{{unmarshall}}"); } if (getContext().getRegistry().lookupByName("aggregate") != null) { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index b1271ac..36ae9e2 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -235,9 +235,6 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); -// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size()); - - sourceTask.getCms().getCamelContext().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("timer")) .forEach(e -> { @@ -261,8 +258,6 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); -// assertEquals(4, sourceTask.getCms().getCamelContext().getEndpoints().size()); - sourceTask.getCms().getCamelContext().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("seda")) .forEach(e -> {