This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-kafka-connector-0.7.x
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit dc8f444b972b1a015bfed2cd6476f83137e7c8bf
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Fri Jan 15 00:05:04 2021 +0100

    Added SchemaAndStructToJsonTransform fix #843
---
 .../transforms/SchemaAndStructToJsonTransform.java |  77 +++++++++++++
 .../SchemaAndStructToJsonTransformTest.java        | 128 +++++++++++++++++++++
 2 files changed, 205 insertions(+)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java
new file mode 100644
index 0000000..72d10db
--- /dev/null
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SchemaAndStructToJsonTransform<R extends ConnectRecord<R>> 
implements Transformation<R> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaAndStructToJsonTransform.class);
+
+    private JsonConverter jsonConverter;
+
+    @Override
+    public R apply(R r) {
+        LOG.debug("Incoming record: {}", r);
+
+        if (r.value() != null && r.valueSchema() != null) {
+            byte[] json = jsonConverter.fromConnectData(r.topic(), 
r.valueSchema(), r.value());
+
+            if (json == null) {
+                LOG.warn("No record was converted as part of this 
transformation, resulting json byte[] was null.");
+                return r;
+            }
+
+            LOG.debug("Json created: {}", new String(json));
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), 
r.key(),
+                    Schema.BYTES_SCHEMA, json, r.timestamp());
+        } else {
+            LOG.debug("Incoming record with a null value or a null schema, 
nothing to be done.");
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return JsonConverterConfig.configDef();
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        jsonConverter = new JsonConverter();
+        Map<String, Object> conf = new HashMap<>(configs);
+        conf.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
+        jsonConverter.configure(conf);
+    }
+}
\ No newline at end of file
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransformTest.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransformTest.java
new file mode 100644
index 0000000..159c7d7
--- /dev/null
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransformTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.transforms.SlackMessage.Attachment;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SchemaAndStructToJsonTransformTest {
+
+    @Test
+    public void testRecordValueConversion() {
+        SourcePojoToSchemaAndStructTransform 
sourcePojoToSchemaAndStructTransform = new 
SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        SlackMessage sm = new SlackMessage();
+
+        Attachment at1 = new Attachment();
+        Attachment.Field at1f1 = new Attachment.Field();
+        at1f1.setTitle("ciao");
+        at1f1.setShortValue(true);
+        at1.setFields(new 
ArrayList<Attachment.Field>(Collections.singleton(at1f1)));
+        at1.setAuthorName("Andrea");
+
+        Attachment at2 = new Attachment();
+        at2.setColor("green");
+
+        ArrayList<Attachment> attachments = new ArrayList<>();
+        attachments.add(at1);
+        attachments.add(at2);
+
+        sm.setText("text");
+        sm.setAttachments(attachments);
+
+        ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(
+                new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, sm));
+
+        SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new 
SchemaAndStructToJsonTransform();
+        schemaAndStructToJsonTransform.configure(Collections.emptyMap());
+
+        ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+        assertEquals(byte[].class.getName(), 
transformedCr.value().getClass().getName());
+        assertTrue(new 
String((byte[])transformedCr.value()).contains("schema"));
+    }
+
+    @Test
+    public void testMapValueConversionSchemaDisabled() {
+        SourcePojoToSchemaAndStructTransform 
sourcePojoToSchemaAndStructTransform = new 
SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        PojoWithMap pwm = new PojoWithMap();
+        pwm.addToMap("ciao", 9);
+
+        ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(new 
SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, pwm));
+
+        SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new 
SchemaAndStructToJsonTransform();
+        
schemaAndStructToJsonTransform.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 false));
+
+        ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+        assertEquals(byte[].class.getName(), 
transformedCr.value().getClass().getName());
+        assertFalse(new 
String((byte[])transformedCr.value()).contains("schema"));
+    }
+
+    @Test()
+    public void testNotStructSchemaConversion() {
+        SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new 
SchemaAndStructToJsonTransform();
+        
schemaAndStructToJsonTransform.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 true));
+
+        Map map = Collections.singletonMap("ciao", 9);
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                null, map);
+
+        ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr);
+        assertEquals(cr, transformedCr);
+    }
+
+    @Test()
+    public void testNullValueConversion() {
+        SchemaAndStructToJsonTransform schemaAndStructToJsonTransform = new 
SchemaAndStructToJsonTransform();
+        
schemaAndStructToJsonTransform.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 true));
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, null);
+
+        ConnectRecord transformedCr = schemaAndStructToJsonTransform.apply(cr);
+        assertEquals(cr, transformedCr);
+    }
+}

Reply via email to