This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new a024c7c create a toHeader SMT #902 a024c7c is described below commit a024c7cff76c410b24da85e3bcf7d81154ed4fe0 Author: Luigi De Masi <ldem...@redhat.com> AuthorDate: Thu Jan 28 18:26:26 2021 +0100 create a toHeader SMT #902 --- .../transforms/FieldsToHeadersTransform.java | 192 ++++++++++++ .../transforms/FieldsToHeadersTransformTest.java | 349 +++++++++++++++++++++ 2 files changed, 541 insertions(+) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java new file mode 100644 index 0000000..ed74886 --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; + +public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> implements Transformation<R> { + + private static final Logger LOG = LoggerFactory.getLogger(FieldsToHeadersTransform.class); + + private static final String PURPOSE = "fields extraction to headers"; + private static final String FIELDS_CONFIG = "fields"; + private static final String HEADERS_CONFIG = "headers"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers") + .define(HEADERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Headers names to set with extracted fields"); + + + private List<String> fields; + + private List<String> headers; + + + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + + @Override + public R apply(R r) { + RecordValue value = createRecordValue(r); + Schema currentSchema; + Object currentValue; + for (int i = 0; i < fields.size(); i++) { + currentSchema = value.getFieldSchema(fields.get(i)); + currentValue = value.getFieldValue(fields.get(i)); + r.headers().add(headers.get(i), currentValue, currentSchema); + } + return r; + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + @Override + public void configure(Map<String, ?> map) { + Map<String, Object> parsedConfig = CONFIG_DEF.parse(map); + fields = (List<String>) parsedConfig.getOrDefault(FIELDS_CONFIG, new ArrayList<>()); + headers = (List<String>) parsedConfig.getOrDefault(HEADERS_CONFIG, new ArrayList<>()); + validateConfig(); + } + + private void validateConfig() { + + boolean validFields = fields.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty()); + boolean validHeaders = headers.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty()); + + if (!(validFields && validHeaders)) { + throw new IllegalArgumentException("headers and fields configuration properties cannot be null or contain empty elements."); + } + if (fields.size() > headers.size()) { + String fieldsWithoutCorrespondingHeaders = fields.subList(headers.size(), fields.size()).stream().collect(Collectors.joining(",")); + throw new IllegalArgumentException("There is no corresponding header(s) configured for the following field(s): " + fieldsWithoutCorrespondingHeaders); + } + if (headers.size() > fields.size()) { + String headersWithoutCorrespondingFields = headers.subList(fields.size(), headers.size()).stream().collect(Collectors.joining(",")); + LOG.warn("There is no corresponding header(s) for the following field(s): {} ", headersWithoutCorrespondingFields); + } + + } + + + private RecordValue createRecordValue(R r) { + final Schema schema = operatingSchema(r); + if (schema == null) { + return new MapRecordValue(requireMapOrNull(operatingValue(r), PURPOSE)); + } + return new StructRecordValue(requireStructOrNull(operatingValue(r), PURPOSE), schema); + } + + public static class Key<R extends ConnectRecord<R>> extends FieldsToHeadersTransform<R> { + + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + } + + public static class Value<R extends ConnectRecord<R>> extends FieldsToHeadersTransform<R> { + + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + } + + public interface RecordValue { + + Object getFieldValue(String fieldName); + + Schema getFieldSchema(String fieldName); + } + + public class MapRecordValue implements RecordValue { + + private Map<String, Object> map; + + public MapRecordValue(Map<String, Object> map) { + this.map = map; + } + + public Object getFieldValue(String fieldName) { + return map == null ? null : map.get(fieldName); + } + + public Schema getFieldSchema(String fieldName) { + return null; + } + } + + public class StructRecordValue implements RecordValue { + + private Struct struct; + + private Schema schema; + + public StructRecordValue(Struct struct, Schema schema) { + this.struct = struct; + this.schema = schema; + } + + public Object getFieldValue(String fieldName) { + return struct.get(fieldName); + } + + public Schema getFieldSchema(String fieldName) { + Field field = schema.field(fieldName); + if (field == null) { + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + return field.schema(); + } + } +} diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java new file mode 100644 index 0000000..a954124 --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.transforms; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import org.apache.camel.util.function.TriConsumer; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.connect.data.Schema.BOOLEAN_SCHEMA; +import static org.apache.kafka.connect.data.Schema.BYTES_SCHEMA; +import static org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA; +import static org.apache.kafka.connect.data.Schema.INT32_SCHEMA; +import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA; +import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FieldsToHeadersTransformTest { + + private static List<String> fields = Arrays.asList("FROM", "TO", "CC", "SUBJECT", "BODY", "INT_EXAMPLE", "BYTE_EXAMPLE", "BOOLEAN_EXAMPLE", "FLOAT_EXAMPLE"); + private static List<String> headers = Arrays.asList("from", "to", "cc", "subject", "body", "int_example", "byte_example", "boolean_example", "float_example"); + + @Test + public void testKeyWithSchema() { + FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key(); + ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (headerSchema, headerValue) -> new SourceRecord(null, null, "testTopic", headerSchema, headerValue, null, null)); + assertNull(transformedCr.value()); + assertNull(transformedCr.valueSchema()); + } + + @Test + public void testValueWithSchema() { + FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); + ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (schema, value) -> new SourceRecord(null, null, "testTopic", schema, value)); + assertNull(transformedCr.key()); + assertNull(transformedCr.keySchema()); + } + + @Test + public void testWithoutSchema() { + + TriConsumer<ConnectRecord, String, String> assertions = (record, headerName, expectedHeaderValue) -> { + assertNull(record.keySchema()); + assertEquals("testTopic", record.topic()); + Iterator<Header> headerIterator = record.headers().allWithName(headerName); + assertTrue(headerIterator.hasNext()); + Header header = headerIterator.next(); + assertEquals(expectedHeaderValue, header.value()); + assertNull(header.schema()); + assertFalse(headerIterator.hasNext()); + }; + + Map<String, String> conf = new HashMap<>(); + conf.put("fields", "FROM,TO"); + conf.put("headers", "from,to"); + + Map<String, String> message = new HashMap<>(); + message.put("FROM", "b...@example.com"); + message.put("TO", "al...@mail.com"); + + // Test KEY + FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key(); + fieldsToHeadersTransformKey.configure(conf); + + final SinkRecord recordKey = new SinkRecord("testTopic", 0, null, message, null, null, 0); + final ConnectRecord transformedRecordKey = fieldsToHeadersTransformKey.apply(recordKey); + + assertions.accept(transformedRecordKey, "from", message.get("FROM")); + assertions.accept(transformedRecordKey, "to", message.get("TO")); + + // Test VALUE + FieldsToHeadersTransform.Value fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value(); + fieldsToHeadersTransformValue.configure(conf); + final SinkRecord recordValue = new SinkRecord("testTopic", 0, null, null, null, message, 0); + final ConnectRecord transformedRecordValue = fieldsToHeadersTransformValue.apply(recordValue); + + assertions.accept(transformedRecordValue, "from", message.get("FROM")); + assertions.accept(transformedRecordValue, "to", message.get("TO")); + } + + @Test + public void fieldsWithoutCorrespondingHeadersTest() { + Map<String, String> conf = new HashMap<>(); + conf.put("fields", "FROM,TO,CC,SUBJECT,BODY"); + conf.put("headers", "from,to"); + // key + final FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key(); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + fieldsToHeadersTransformKey.configure(conf); + }); + // value + final FieldsToHeadersTransform fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value<>(); + Assertions.assertThrows(IllegalArgumentException.class, () -> { + fieldsToHeadersTransformValue.configure(conf); + }); + } + + @Test + public void headersWithoutCorrespondingFieldssTest() { + Map<String, String> conf = new HashMap<>(); + conf.put("fields", "FROM"); + conf.put("headers", "from,to,cc,subject,body"); + // key + final FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key(); + fieldsToHeadersTransformKey.configure(conf); + + // value + final FieldsToHeadersTransform fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value<>(); + fieldsToHeadersTransformValue.configure(conf); + } + + @Test + public void missingFieldInTheSchemaKeyTest() { + FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key(); + Schema keySchema = buildSchemaWithoutCC(); + Struct keyValue = buildValueWithoutCC(keySchema); + + fieldsToHeadersTransform.configure(buildConfig()); + ConnectRecord record = new SourceRecord(null, null, "testTopic", keySchema, keyValue, null, null); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + fieldsToHeadersTransform.apply(record); + }); + } + + @Test + public void missingFieldInTheSchemaValueTest() { + FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); + Schema valueSchema = buildSchemaWithoutCC(); + Struct value = buildValueWithoutCC(valueSchema); + + fieldsToHeadersTransform.configure(buildConfig()); + ConnectRecord record = new SourceRecord(null, null, "testTopic", valueSchema, value); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + fieldsToHeadersTransform.apply(record); + }); + } + + @Test + public void missingFieldInTheRecordKeyWithSchemaTest() { + FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key(); + Schema keySchema = buildSchema(); + Struct keyValue = buildValueWithoutCC(keySchema); + + fieldsToHeadersTransform.configure(buildConfig()); + ConnectRecord record = new SourceRecord(null, null, "testTopic", keySchema, keyValue, null, null); + + ConnectRecord transformedRecord = fieldsToHeadersTransform.apply(record); + Iterator<Header> headerIterator = transformedRecord.headers().allWithName("cc"); + assertTrue(headerIterator.hasNext()); + Header header = headerIterator.next(); + assertNotNull(header); + assertNull(header.value()); + assertEquals(OPTIONAL_STRING_SCHEMA, header.schema()); + assertFalse(headerIterator.hasNext()); + } + + @Test + public void missingFieldInTheRecordValueWithSchemaTest() { + FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); + Schema valueSchema = buildSchema(); + Struct value = buildValueWithoutCC(valueSchema); + + fieldsToHeadersTransform.configure(buildConfig()); + ConnectRecord record = new SourceRecord(null, null, "testTopic", valueSchema, value); + + ConnectRecord transformedRecord = fieldsToHeadersTransform.apply(record); + Iterator<Header> headerIterator = transformedRecord.headers().allWithName("cc"); + assertTrue(headerIterator.hasNext()); + Header header = headerIterator.next(); + assertNotNull(header); + assertNull(header.value()); + assertEquals(OPTIONAL_STRING_SCHEMA, header.schema()); + assertFalse(headerIterator.hasNext()); + + } + @Test + public void missingFieldWithoutSchema() { + + TriConsumer<ConnectRecord, String, String> assertions = (record, headerName, expectedHeaderValue) -> { + assertNull(record.keySchema()); + assertEquals("testTopic", record.topic()); + Iterator<Header> headerIterator = record.headers().allWithName(headerName); + assertTrue(headerIterator.hasNext()); + Header header = headerIterator.next(); + assertEquals(expectedHeaderValue, header.value()); + assertNull(header.schema()); + assertFalse(headerIterator.hasNext()); + }; + + Map<String, String> conf = new HashMap<>(); + conf.put("fields", "FROM,TO,CC"); + conf.put("headers", "from,to,cc"); + + Map<String, String> message = new HashMap<>(); + message.put("FROM", "b...@example.com"); + message.put("TO", "al...@mail.com"); + + // Test KEY + FieldsToHeadersTransform.Key fieldsToHeadersTransformKey = new FieldsToHeadersTransform.Key(); + fieldsToHeadersTransformKey.configure(conf); + + final SinkRecord recordKey = new SinkRecord("testTopic", 0, null, message, null, null, 0); + final ConnectRecord transformedRecordKey = fieldsToHeadersTransformKey.apply(recordKey); + + assertions.accept(transformedRecordKey, "from", message.get("FROM")); + assertions.accept(transformedRecordKey, "to", message.get("TO")); + Iterator<Header> headerIterator = transformedRecordKey.headers().allWithName("cc"); + assertTrue(headerIterator.hasNext()); + Header header = headerIterator.next(); + assertNotNull(header); + assertNull(header.value()); + assertFalse(headerIterator.hasNext()); + + // Test VALUE + FieldsToHeadersTransform.Value fieldsToHeadersTransformValue = new FieldsToHeadersTransform.Value(); + fieldsToHeadersTransformValue.configure(conf); + final SinkRecord recordValue = new SinkRecord("testTopic", 0, null, null, null, message, 0); + final ConnectRecord transformedRecordValue = fieldsToHeadersTransformValue.apply(recordValue); + + assertions.accept(transformedRecordValue, "from", message.get("FROM")); + assertions.accept(transformedRecordValue, "to", message.get("TO")); + + headerIterator = transformedRecordKey.headers().allWithName("cc"); + assertTrue(headerIterator.hasNext()); + header = headerIterator.next(); + assertNotNull(header); + assertNull(header.value()); + assertFalse(headerIterator.hasNext()); + } + + private static Schema buildSchemaWithoutCC() { + return SchemaBuilder.struct() + .field("FROM", STRING_SCHEMA) + .field("TO", STRING_SCHEMA) + .field("SUBJECT", STRING_SCHEMA) + .field("BODY", STRING_SCHEMA) + .field("INT_EXAMPLE", INT32_SCHEMA) + .field("BYTE_EXAMPLE", BYTES_SCHEMA) + .field("BOOLEAN_EXAMPLE", BOOLEAN_SCHEMA) + .field("FLOAT_EXAMPLE", FLOAT32_SCHEMA).build(); + } + + private static Struct buildValueWithoutCC(Schema schema) { + byte[] attachment = new byte[32]; + new Random().nextBytes(attachment); + return new Struct(schema) + .put("FROM", "b...@example.com") + .put("TO", "al...@mail.com") + .put("SUBJECT", "Needs Attention") + .put("BODY", "there is an issue that needs your attention") + .put("INT_EXAMPLE", 34) + .put("BYTE_EXAMPLE", attachment) + .put("BOOLEAN_EXAMPLE", true) + .put("FLOAT_EXAMPLE", 34.5F); + } + + private static Schema buildSchema() { + return SchemaBuilder.struct() + .field("FROM", STRING_SCHEMA) + .field("TO", STRING_SCHEMA) + .field("CC", OPTIONAL_STRING_SCHEMA) + .field("SUBJECT", STRING_SCHEMA) + .field("BODY", STRING_SCHEMA) + .field("INT_EXAMPLE", INT32_SCHEMA) + .field("BYTE_EXAMPLE", BYTES_SCHEMA) + .field("BOOLEAN_EXAMPLE", BOOLEAN_SCHEMA) + .field("FLOAT_EXAMPLE", FLOAT32_SCHEMA).build(); + } + + private static Struct buildValue(Schema schema) { + byte[] attachment = new byte[32]; + new Random().nextBytes(attachment); + return new Struct(schema) + .put("FROM", "b...@example.com") + .put("TO", "al...@mail.com") + .put("CC", "manag...@enterprise.com") + .put("SUBJECT", "Needs Attention") + .put("BODY", "there is an issue that needs your attention") + .put("INT_EXAMPLE", 34) + .put("BYTE_EXAMPLE", attachment) + .put("BOOLEAN_EXAMPLE", true) + .put("FLOAT_EXAMPLE", 34.5F); + } + + private static Map<String, String> buildConfig() { + Map map = new HashMap(); + map.put("fields", fields.stream().collect(Collectors.joining(","))); + map.put("headers", headers.stream().collect(Collectors.joining(","))); + return map; + } + + public ConnectRecord testWithSchema(FieldsToHeadersTransform fieldsToHeadersTransform, BiFunction<Schema, Struct, ConnectRecord> createRecord) { + Schema valueSchema = buildSchema(); + + Struct value = buildValue(valueSchema); + fieldsToHeadersTransform.configure(buildConfig()); + ConnectRecord record = createRecord.apply(valueSchema, value); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(record); + + assertEquals("testTopic", transformedCr.topic()); + Iterator<Header> headerIterator; + Header header; + for (int i = 0; i < fields.size(); i++) { + headerIterator = transformedCr.headers().allWithName(headers.get(i)); + assertTrue(headerIterator.hasNext()); + header = headerIterator.next(); + assertEquals(value.get(fields.get(i)), header.value()); + assertEquals(valueSchema.field(fields.get(i)).schema(), header.schema()); + assertFalse(headerIterator.hasNext()); + } + return transformedCr; + } +}