This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-kafka-connector-0.7.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit dc8f444b972b1a015bfed2cd6476f83137e7c8bf Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Fri Jan 15 00:05:04 2021 +0100 Added SchemaAndStructToJsonTransform fix #843 --- .../transforms/SchemaAndStructToJsonTransform.java | 77 +++++++++++++ .../SchemaAndStructToJsonTransformTest.java | 128 +++++++++++++++++++++ 2 files changed, 205 insertions(+) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java new file mode 100644 index 0000000..72d10db --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SchemaAndStructToJsonTransform<R extends ConnectRecord<R>> implements Transformation<R> { + private static final Logger LOG = LoggerFactory.getLogger(SchemaAndStructToJsonTransform.class); + + private JsonConverter jsonConverter; + + @Override + public R apply(R r) { + LOG.debug("Incoming record: {}", r); + + if (r.value() != null && r.valueSchema() != null) { + byte[] json = jsonConverter.fromConnectData(r.topic(), r.valueSchema(), r.value()); + + if (json == null) { + LOG.warn("No record was converted as part of this transformation, resulting json byte[] was null."); + return r; + } + + LOG.debug("Json created: {}", new String(json)); + + return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), + Schema.BYTES_SCHEMA, json, r.timestamp()); + } else { + LOG.debug("Incoming record with a null value or a null schema, nothing to be done."); + return r; + } + } + + @Override + public ConfigDef config() { + return JsonConverterConfig.configDef(); + } + + @Override + public void close() { + //NOOP + } + + @Override + public void configure(Map<String, ?> configs) { + jsonConverter = new JsonConverter(); + Map<String, Object> conf = new HashMap<>(configs); + conf.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); + jsonConverter.configure(conf); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransformTest.java new file mode 100644 index 0000000..159c7d7 --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransformTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; + +import org.apache.camel.kafkaconnector.transforms.SlackMessage.Attachment; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SchemaAndStructToJsonTransformTest { + + @Test + public void testRecordValueConversion() { + SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform(); + sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap()); + + SlackMessage sm = new SlackMessage(); + + Attachment at1 = new Attachment(); + Attachment.Field at1f1 = new Attachment.Field(); + at1f1.setTitle("ciao"); + at1f1.setShortValue(true); + at1.setFields(new ArrayList<Attachment.Field>(Collections.singleton(at1f1))); + at1.setAuthorName("Andrea"); + + Attachment at2 = new Attachment(); + at2.setColor("green"); + + ArrayList<Attachment> attachments = new ArrayList<>(); + attachments.add(at1); + attachments.add(at2); + + sm.setText("text"); + sm.setAttachments(attachments); + + ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply( + new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, sm)); + + SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new SchemaAndStructToJsonTransform(); + schemaAndStructToJsonTransform.configure(Collections.emptyMap()); + + ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr); + + assertEquals("testTopic", transformedCr.topic()); + assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema()); + assertEquals("testKeyValue", transformedCr.key()); + assertEquals(byte[].class.getName(), transformedCr.value().getClass().getName()); + assertTrue(new String((byte[])transformedCr.value()).contains("schema")); + } + + @Test + public void testMapValueConversionSchemaDisabled() { + SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform(); + sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap()); + + PojoWithMap pwm = new PojoWithMap(); + pwm.addToMap("ciao", 9); + + ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, pwm)); + + SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new SchemaAndStructToJsonTransform(); + schemaAndStructToJsonTransform.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false)); + + ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr); + + assertEquals("testTopic", transformedCr.topic()); + assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema()); + assertEquals("testKeyValue", transformedCr.key()); + assertEquals(byte[].class.getName(), transformedCr.value().getClass().getName()); + assertFalse(new String((byte[])transformedCr.value()).contains("schema")); + } + + @Test() + public void testNotStructSchemaConversion() { + SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new SchemaAndStructToJsonTransform(); + schemaAndStructToJsonTransform.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true)); + + Map map = Collections.singletonMap("ciao", 9); + + ConnectRecord cr = new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + null, map); + + ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr); + assertEquals(cr, transformedCr); + } + + @Test() + public void testNullValueConversion() { + SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new SchemaAndStructToJsonTransform(); + schemaAndStructToJsonTransform.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true)); + + ConnectRecord cr = new SourceRecord(null, null, "testTopic", + Schema.STRING_SCHEMA, "testKeyValue", + Schema.BYTES_SCHEMA, null); + + ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr); + assertEquals(cr, transformedCr); + } +}