This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 43ef44d FieldsToHeadersTransform: added option to copy whole value/key related to issue #988 43ef44d is described below commit 43ef44de8130d9878131163c8cee021df4dadddd Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Wed Apr 7 15:39:29 2021 +0200 FieldsToHeadersTransform: added option to copy whole value/key related to issue #988 --- .../transforms/FieldsToHeadersTransform.java | 54 +++++++++++++------ .../transforms/FieldsToHeadersTransformTest.java | 62 +++++++++++++++++++++- .../ROOT/pages/single-message-transform.adoc | 8 +-- .../ROOT/pages/transformers/fieldsToHeaders.adoc | 29 +++++++++- 4 files changed, 132 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java index ed74886..72c8f5f 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransform.java @@ -43,30 +43,32 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple private static final String HEADERS_CONFIG = "headers"; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers") + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, new ArrayList<>(), ConfigDef.Importance.MEDIUM, "Fields names to extract and set to headers") .define(HEADERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Headers names to set with extracted fields"); - private List<String> fields; private List<String> headers; - - protected abstract Schema operatingSchema(R record); protected abstract Object operatingValue(R record); - @Override public R apply(R r) { RecordValue value = createRecordValue(r); Schema currentSchema; Object currentValue; - for (int i = 0; i < fields.size(); i++) { - currentSchema = value.getFieldSchema(fields.get(i)); - currentValue = value.getFieldValue(fields.get(i)); - r.headers().add(headers.get(i), currentValue, currentSchema); + if (fields.isEmpty()) { + currentSchema = value.getFieldSchema(""); + currentValue = value.getFieldValue(""); + r.headers().add(headers.get(0), currentValue, currentSchema); + } else { + for (int i = 0; i < fields.size(); i++) { + currentSchema = value.getFieldSchema(fields.get(i)); + currentValue = value.getFieldValue(fields.get(i)); + r.headers().add(headers.get(i), currentValue, currentSchema); + } } return r; } @@ -90,26 +92,30 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple private void validateConfig() { - boolean validFields = fields.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty()); + boolean validFields = fields.stream().allMatch(nef -> nef != null); boolean validHeaders = headers.stream().allMatch(nef -> nef != null && !nef.trim().isEmpty()); if (!(validFields && validHeaders)) { - throw new IllegalArgumentException("headers and fields configuration properties cannot be null or contain empty elements."); + throw new IllegalArgumentException("fields configuration property cannot be null (can be an empty string if you want the whole value/key), headers configuration property cannot be null or contain empty elements."); } - if (fields.size() > headers.size()) { + if (fields.size() != 0 && fields.size() > headers.size()) { String fieldsWithoutCorrespondingHeaders = fields.subList(headers.size(), fields.size()).stream().collect(Collectors.joining(",")); throw new IllegalArgumentException("There is no corresponding header(s) configured for the following field(s): " + fieldsWithoutCorrespondingHeaders); } - if (headers.size() > fields.size()) { + if (fields.size() != 0 && headers.size() > fields.size()) { String headersWithoutCorrespondingFields = headers.subList(fields.size(), headers.size()).stream().collect(Collectors.joining(",")); LOG.warn("There is no corresponding header(s) for the following field(s): {} ", headersWithoutCorrespondingFields); } - + if (fields.size() == 0 && headers.size() > 1) { + LOG.warn("Fields are empty and there are more than 1 header it means whole value/key will put in the first header of this list: {} ", headers.stream().collect(Collectors.joining(","))); + } } - private RecordValue createRecordValue(R r) { final Schema schema = operatingSchema(r); + if (fields.isEmpty()) { + return new WholeRecordValue(operatingValue(r), schema); + } if (schema == null) { return new MapRecordValue(requireMapOrNull(operatingValue(r), PURPOSE)); } @@ -149,6 +155,24 @@ public abstract class FieldsToHeadersTransform<R extends ConnectRecord<R>> imple Schema getFieldSchema(String fieldName); } + public class WholeRecordValue implements RecordValue { + private Object value; + private Schema schema; + + public WholeRecordValue(Object value, Schema schema) { + this.value = value; + this.schema = schema; + } + + public Object getFieldValue(String fieldName) { + return value; + } + + public Schema getFieldSchema(String fieldName) { + return schema; + } + } + public class MapRecordValue implements RecordValue { private Map<String, Object> map; diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java index a954124..1fadf91 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/FieldsToHeadersTransformTest.java @@ -62,6 +62,36 @@ public class FieldsToHeadersTransformTest { } @Test + public void testWholeKey() { + FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key(); + Map<String, String> conf = new HashMap<>(); + conf.put("headers", "camel.kafka.KEY"); + fieldsToHeadersTransform.configure(conf); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, 100, null, null)); + assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value()); + } + + @Test + public void testWholeKeyWithSchema() { + FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key(); + Map<String, String> conf = new HashMap<>(); + conf.put("headers", "camel.kafka.KEY"); + fieldsToHeadersTransform.configure(conf); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", INT32_SCHEMA, 100, null, null)); + assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value()); + } + + @Test + public void testWholeKeyMultipleHeaders() { + FieldsToHeadersTransform.Key fieldsToHeadersTransform = new FieldsToHeadersTransform.Key(); + Map<String, String> conf = new HashMap<>(); + conf.put("headers", "camel.kafka.KEY,should.not.be.set"); + fieldsToHeadersTransform.configure(conf); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, 100, null, null)); + assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value()); + } + + @Test public void testValueWithSchema() { FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); ConnectRecord transformedCr = testWithSchema(fieldsToHeadersTransform, (schema, value) -> new SourceRecord(null, null, "testTopic", schema, value)); @@ -112,6 +142,36 @@ public class FieldsToHeadersTransformTest { } @Test + public void testWholeValue() { + FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); + Map<String, String> conf = new HashMap<>(); + conf.put("headers", "camel.kafka.KEY"); + fieldsToHeadersTransform.configure(conf); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, null, 100)); + assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value()); + } + + @Test + public void testWholeValueMultipleHeaders() { + FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); + Map<String, String> conf = new HashMap<>(); + conf.put("headers", "camel.kafka.KEY,should.not.be.set"); + fieldsToHeadersTransform.configure(conf); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, null, 100)); + assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value()); + } + + @Test + public void testWholeValueWithSchema() { + FieldsToHeadersTransform.Value fieldsToHeadersTransform = new FieldsToHeadersTransform.Value(); + Map<String, String> conf = new HashMap<>(); + conf.put("headers", "camel.kafka.KEY"); + fieldsToHeadersTransform.configure(conf); + ConnectRecord transformedCr = fieldsToHeadersTransform.apply(new SourceRecord(null, null, "testTopic", null, null, INT32_SCHEMA, 100)); + assertEquals(100, transformedCr.headers().lastWithName("camel.kafka.KEY").value()); + } + + @Test public void fieldsWithoutCorrespondingHeadersTest() { Map<String, String> conf = new HashMap<>(); conf.put("fields", "FROM,TO,CC,SUBJECT,BODY"); @@ -130,7 +190,7 @@ public class FieldsToHeadersTransformTest { } @Test - public void headersWithoutCorrespondingFieldssTest() { + public void headersWithoutCorrespondingFieldsTest() { Map<String, String> conf = new HashMap<>(); conf.put("fields", "FROM"); conf.put("headers", "from,to,cc,subject,body"); diff --git a/docs/modules/ROOT/pages/single-message-transform.adoc b/docs/modules/ROOT/pages/single-message-transform.adoc index 1b2a0a8..4ed18af 100644 --- a/docs/modules/ROOT/pages/single-message-transform.adoc +++ b/docs/modules/ROOT/pages/single-message-transform.adoc @@ -10,14 +10,14 @@ SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka. SMTs transform outbound messages before they are sent to a sink connector. The following SMTs are available for use with Kafka Connect. -In addition to the https://docs.confluent.io/current/connect/transforms/index.html?_ga=2.114132679.1300749793.1612453936-340332144.1608111036&_gac=1.228771694.1611682559.CjwKCAiAxp-ABhALEiwAXm6IyeFH3rDEIpDWXGQaoibWgcjXxdz8YLkAbi3n8O-quiuWzdAeNezaqhoCdmgQAvD_BwE[prebuilt transformations]#,# -Camel Kafka Connector provides additionals SMTs: +In addition to the https://kafka.apache.org/documentation/#connect_transforms[prebuilt transformations], +Camel Kafka Connector provides additional SMTs: [cols="^,^ ", options="header"] |=== |Transform | Description |xref:transformers/fieldsToHeaders.adoc[FieldsToHeaders] -|Extract fields from a configured record value (struct, schema and map are supported) -and copy the value in configured headers +|Extract fields from a kafka connect record's value or key and copy the value in configured headers (struct + schema, map or copy the whole value/key are the supported options) + |=== diff --git a/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc b/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc index 4ebc2d1..2fbd0c2 100644 --- a/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc +++ b/docs/modules/ROOT/pages/transformers/fieldsToHeaders.adoc @@ -17,6 +17,8 @@ field 'a' goes to header 'X', field 'b' goes to header 'Y' etc... Any null values are passed through unmodified. +If `transforms.fieldsToHeadersTransform.fields` is empty or omitted the whole value will be copied in the first `transforms.fieldsToHeadersTransform.headers`. + Use the concrete transformation type designed for the record key (`org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Key`) or value (`org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Value`). @@ -25,6 +27,8 @@ Use the concrete transformation type designed for the record key The following examples show how to use FieldsToHeaders. +=== Given the following configuration: + [source,java-properties] ---- transforms=FieldsToHeadersTransform @@ -32,7 +36,7 @@ transforms.fieldsToHeadersTransform.type=org.apache.camel.kafkaconnector.transfo transforms.fieldsToHeadersTransform.fields=FROM,TO,CC,SUBJECT transforms.fieldsToHeadersTransform.headers=from,to,cc,subject ---- -Given the following message: +and the following message: [source,json] ---- @@ -58,6 +62,29 @@ subject: "Needs Attention!" body: "Tere is an issue that needs your attention!" ---- +=== Given the following configuration: + +[source,java-properties] +---- +transforms=FieldsToHeadersTransform +transforms.fieldsToHeadersTransform.type=org.apache.camel.kafkaconnector.transforms.FieldsToHeadersTransform$Key +transforms.fieldsToHeadersTransform.headers=camel.kafka.KEY +---- +and the following message: + +[source] +---- +kafka record with: + value: "my message" + key: 100 +---- +the following headers will be added to the Kafka ConnectRecord object: + +[source] +---- + camel.kafka.KEY: 100 +---- + == Properties [cols="^,^ ", options="header"]