This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 3caeb0114f557c359f1261638b6a3fd96c28e8f4 Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Fri Oct 9 10:48:32 2020 +0200 core: use camel's built-in support for configuring data formats trough properties #497 --- .../utils/CamelKafkaConnectMain.java | 42 +++------------------- .../camel/kafkaconnector/DataFormatTest.java | 11 +++--- 2 files changed, 9 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 191feb0..36ec56a 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -22,15 +22,12 @@ import java.util.Properties; import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; import org.apache.camel.ConsumerTemplate; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.kafkaconnector.CamelConnectorConfig; import org.apache.camel.main.SimpleMain; import org.apache.camel.model.RouteDefinition; -import org.apache.camel.spi.DataFormat; -import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -144,14 +141,12 @@ public class CamelKafkaConnectMain extends SimpleMain { //dataformats if (!ObjectHelper.isEmpty(marshallDataFormat)) { - LOG.info(".marshal().custom({})", marshallDataFormat); - getContext().getRegistry().bind(marshallDataFormat, lookupAndInstantiateDataformat(getContext(), marshallDataFormat)); - rd.marshal().custom(marshallDataFormat); + LOG.info(".marshal({})", marshallDataFormat); + rd.marshal(marshallDataFormat); } if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { - LOG.info(".unmarshal().custom({})", unmarshallDataFormat); - getContext().getRegistry().bind(unmarshallDataFormat, lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat)); - rd.unmarshal().custom(unmarshallDataFormat); + LOG.info(".unmarshal({})", unmarshallDataFormat); + rd.unmarshal(unmarshallDataFormat); } if (getContext().getRegistry().lookupByName("aggregate") != null) { //aggregation @@ -170,33 +165,4 @@ public class CamelKafkaConnectMain extends SimpleMain { return camelMain; } } - - private static DataFormat lookupAndInstantiateDataformat(CamelContext camelContext, String dataformatName) { - DataFormat df = camelContext.resolveDataFormat(dataformatName); - - if (df == null) { - df = camelContext.createDataFormat(dataformatName); - - final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + "."; - final Properties props = camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix)); - - CamelContextAware.trySetCamelContext(df, camelContext); - - if (!props.isEmpty()) { - PropertyBindingSupport.build() - .withCamelContext(camelContext) - .withOptionPrefix(prefix) - .withRemoveParameters(false) - .withProperties((Map) props) - .withTarget(df) - .bind(); - } - } - - //TODO: move it to the caller? - if (df == null) { - throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName); - } - return df; - } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index 0ce5ab0..4e309d4 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -20,9 +20,9 @@ import java.util.HashMap; import java.util.Map; import org.apache.camel.component.hl7.HL7DataFormat; +import org.apache.camel.component.syslog.SyslogDataFormat; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; -import org.apache.camel.model.dataformat.SyslogDataFormat; import org.apache.kafka.connect.errors.ConnectException; import org.junit.jupiter.api.Test; @@ -94,9 +94,9 @@ public class DataFormatTest { dcc.getRegistry().bind("syslog", syslogDf); cms.start(); - HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class); + HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7"); assertNotNull(hl7dfLoaded); - SyslogDataFormat syslogDfLoaded = dcc.getRegistry().lookupByNameAndType("syslog", SyslogDataFormat.class); + SyslogDataFormat syslogDfLoaded = (SyslogDataFormat)dcc.resolveDataFormat("syslog"); assertNotNull(syslogDfLoaded); cms.stop(); } @@ -119,7 +119,7 @@ public class DataFormatTest { dcc.getRegistry().bind("hl7", hl7df); cms.start(); - HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class); + HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7"); assertFalse(hl7dfLoaded.isValidate()); cms.stop(); } @@ -139,9 +139,8 @@ public class DataFormatTest { .withMarshallDataFormat("hl7") .build(dcc); - cms.start(); - HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class); + HL7DataFormat hl7dfLoaded = (HL7DataFormat)dcc.resolveDataFormat("hl7"); assertTrue(hl7dfLoaded.isValidate()); cms.stop(); }