This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-21249
in repository https://gitbox.apache.org/repos/asf/camel.git

commit aac369d2b793aed25a3edde36a7a3882a89ee03e
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Oct 10 15:49:23 2024 +0200

    CAMEL-21249 - Camel-Kamelets: Move Kamelets utils in Camel Kamelets 
component
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 .../apache/camel/catalog/transformers.properties   |   1 +
 .../aws2-ddb-application-x-struct.json             |  14 +
 components/camel-kamelet/pom.xml                   |  48 ++++
 .../org/apache/camel/transformer.properties        |   7 +
 .../transformer/aws2-ddb-application-x-struct      |   2 +
 .../transformer/aws2-ddb-application-x-struct.json |  14 +
 .../component/kamelet/utils/djl/ImageNetUtil.java  |  57 ++++
 .../component/kamelet/utils/format/MimeType.java   |  55 ++++
 .../format/schema/DelegatingSchemaResolver.java    | 121 +++++++++
 .../kamelet/utils/mongodb/SslAwareMongoClient.java | 291 +++++++++++++++++++++
 .../gson/JavaTimeInstantTypeAdapter.java           |  41 +++
 .../kafka/KafkaHeaderDeserializer.java             |  98 +++++++
 .../kamelet/utils/transform/DropField.java         |  60 +++++
 .../kamelet/utils/transform/ExtractField.java      | 112 ++++++++
 .../kamelet/utils/transform/HoistField.java        |  38 +++
 .../kamelet/utils/transform/InsertField.java       |  85 ++++++
 .../kamelet/utils/transform/MaskField.java         | 125 +++++++++
 .../utils/transform/MessageTimestampRouter.java    |  90 +++++++
 .../kamelet/utils/transform/RegexRouter.java       |  50 ++++
 .../kamelet/utils/transform/ReplaceField.java      |  90 +++++++
 .../kamelet/utils/transform/TimestampRouter.java   |  69 +++++
 .../ddb/Ddb2JsonStructDataTypeTransformer.java     |  46 ++++
 .../utils/transform/kafka/BatchManualCommit.java   |  42 +++
 .../utils/transform/kafka/ManualCommit.java        |  33 +++
 .../kamelet/utils/transform/kafka/ValueToKey.java  |  57 ++++
 .../utils/kafka/KafkaHeaderDeserializerTest.java   |  78 ++++++
 .../kamelet/utils/transform/ExtractFieldTest.java  | 133 ++++++++++
 .../kamelet/utils/transform/HoistFieldTest.java    |  58 ++++
 .../kamelet/utils/transform/InsertFieldTest.java   |  73 ++++++
 .../kamelet/utils/transform/MaskFieldTest.java     |  86 ++++++
 .../kamelet/utils/transform/RegexRouterTest.java   |  51 ++++
 .../kamelet/utils/transform/ReplaceFieldTest.java  | 115 ++++++++
 32 files changed, 2240 insertions(+)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
index 4efe6eae08f..b740fa67e2c 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
@@ -8,6 +8,7 @@ avro-x-java-object
 avro-x-struct
 aws-cloudtrail-application-cloudevents
 aws2-ddb-application-json
+aws2-ddb-application-x-struct
 aws2-ddbstream-application-cloudevents
 aws2-kinesis-application-cloudevents
 aws2-s3-application-cloudevents
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json
new file mode 100644
index 00000000000..d826347d326
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json
@@ -0,0 +1,14 @@
+{
+  "transformer": {
+    "kind": "transformer",
+    "name": "aws2-ddb:application-x-struct",
+    "title": "Aws2 Ddb (Application X Struct)",
+    "description": "Transforms DynamoDB record into a Json node",
+    "deprecated": false,
+    "javaType": 
"org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-kamelet",
+    "version": "4.9.0-SNAPSHOT"
+  }
+}
+
diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index e7635a793ba..7473eabff03 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -43,6 +43,54 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core-model</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-djl</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.extjwnl</groupId>
+            <artifactId>extjwnl</artifactId>
+            <version>2.0.5</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.extjwnl</groupId>
+            <artifactId>extjwnl-data-wn31</artifactId>
+            <version>1.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jackson</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jackson-avro</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jackson-protobuf</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-mongodb</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-gson</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
 
         <!-- TESTS  -->
         <dependency>
diff --git 
a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties
 
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties
new file mode 100644
index 00000000000..3d2567b0cb8
--- /dev/null
+++ 
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+transformers=aws2-ddb:application-x-struct
+groupId=org.apache.camel
+artifactId=camel-kamelet
+version=4.9.0-SNAPSHOT
+projectName=Camel :: Kamelet
+projectDescription=To call Kamelets
diff --git 
a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
 
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
new file mode 100644
index 00000000000..4db224fcbbe
--- /dev/null
+++ 
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer
diff --git 
a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
 
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
new file mode 100644
index 00000000000..d826347d326
--- /dev/null
+++ 
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
@@ -0,0 +1,14 @@
+{
+  "transformer": {
+    "kind": "transformer",
+    "name": "aws2-ddb:application-x-struct",
+    "title": "Aws2 Ddb (Application X Struct)",
+    "description": "Transforms DynamoDB record into a Json node",
+    "deprecated": false,
+    "javaType": 
"org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-kamelet",
+    "version": "4.9.0-SNAPSHOT"
+  }
+}
+
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java
new file mode 100644
index 00000000000..7f4416deef9
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.djl;
+
+import java.util.List;
+
+import ai.djl.modality.Classifications;
+import net.sf.extjwnl.data.IndexWord;
+import net.sf.extjwnl.data.POS;
+import net.sf.extjwnl.data.PointerUtils;
+import net.sf.extjwnl.data.list.PointerTargetNodeList;
+import net.sf.extjwnl.dictionary.Dictionary;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * A utility bean class for handling ImageNet (https://image-net.org/) 
classifications.
+ */
+public class ImageNetUtil {
+
+    public ImageNetUtil() {
+    }
+
+    public void extractClassName(Exchange exchange) {
+        Classifications body = 
exchange.getMessage().getBody(Classifications.class);
+        String className = body.best().getClassName().split(",")[0].split(" ", 
2)[1];
+        exchange.getMessage().setBody(className);
+    }
+
+    public void addHypernym(Exchange exchange) throws Exception {
+        String className = exchange.getMessage().getBody(String.class);
+        Dictionary dic = Dictionary.getDefaultResourceInstance();
+        IndexWord word = dic.getIndexWord(POS.NOUN, className);
+        if (word == null) {
+            throw new RuntimeCamelException("Word not found: " + className);
+        }
+        PointerTargetNodeList hypernyms = 
PointerUtils.getDirectHypernyms(word.getSenses().get(0));
+        String hypernym = hypernyms.stream()
+                .map(h -> h.getSynset().getWords().get(0).getLemma())
+                .findFirst().orElse(className);
+        exchange.getMessage().setBody(List.of(className, hypernym));
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java
new file mode 100644
index 00000000000..aa16d5300c0
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.format;
+
+import java.util.Objects;
+
+public enum MimeType {
+    JSON("application/json"),
+    PROTOBUF("application/protobuf"),
+    PROTOBUF_BINARY("protobuf/binary"),
+    PROTOBUF_STRUCT("protobuf/x-struct"),
+    AVRO("application/avro"),
+    AVRO_BINARY("avro/binary"),
+    AVRO_STRUCT("avro/x-struct"),
+    BINARY("application/octet-stream"),
+    TEXT("text/plain"),
+    JAVA_OBJECT("application/x-java-object"),
+    STRUCT("application/x-struct");
+
+    private static final MimeType[] VALUES = values();
+    private final String type;
+
+    MimeType(String type) {
+        this.type = type;
+    }
+
+    public String type() {
+        return type;
+    }
+
+    public static MimeType of(String type) {
+        for (MimeType mt : VALUES) {
+            if (Objects.equals(type, mt.type)) {
+                return mt;
+            }
+        }
+
+        throw new IllegalArgumentException("Unsupported type: " + type);
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java
new file mode 100644
index 00000000000..aa317448fbb
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.format.schema;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jackson.avro.transform.AvroSchemaResolver;
+import 
org.apache.camel.component.jackson.protobuf.transform.ProtobufSchemaResolver;
+import org.apache.camel.component.jackson.transform.JsonSchemaResolver;
+import org.apache.camel.component.kamelet.utils.format.MimeType;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Schema resolver processor delegates to either Avro or Json schema resolver 
based on the given mimetype property. When
+ * mimetype is of type application/x-java-object uses additional target 
mimetype (usually the produces mimetype) to
+ * determine the schema resolver (Avro or Json). Delegates to schema resolver 
and sets proper content class and schema
+ * properties on the delegate.
+ */
+public class DelegatingSchemaResolver implements Processor {
+    private String mimeType;
+    private String targetMimeType;
+
+    private String schema;
+    private String contentClass;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (ObjectHelper.isEmpty(mimeType)) {
+            return;
+        }
+
+        MimeType mimeType = MimeType.of(this.mimeType);
+        Processor resolver;
+        if (mimeType.equals(MimeType.JAVA_OBJECT)) {
+            if (ObjectHelper.isEmpty(targetMimeType)) {
+                return;
+            }
+            resolver = fromMimeType(MimeType.of(targetMimeType));
+        } else {
+            resolver = fromMimeType(mimeType);
+        }
+
+        if (resolver != null) {
+            resolver.process(exchange);
+        }
+    }
+
+    private Processor fromMimeType(MimeType mimeType) {
+        switch (mimeType) {
+            case PROTOBUF:
+            case PROTOBUF_BINARY:
+            case PROTOBUF_STRUCT:
+                ProtobufSchemaResolver protobufSchemaResolver = new 
ProtobufSchemaResolver();
+                protobufSchemaResolver.setSchema(this.schema);
+                protobufSchemaResolver.setContentClass(this.contentClass);
+                return protobufSchemaResolver;
+            case AVRO:
+            case AVRO_BINARY:
+            case AVRO_STRUCT:
+                AvroSchemaResolver avroSchemaResolver = new 
AvroSchemaResolver();
+                avroSchemaResolver.setSchema(this.schema);
+                avroSchemaResolver.setContentClass(this.contentClass);
+                return avroSchemaResolver;
+            case JSON:
+            case STRUCT:
+                JsonSchemaResolver jsonSchemaResolver = new 
JsonSchemaResolver();
+                jsonSchemaResolver.setSchema(this.schema);
+                jsonSchemaResolver.setContentClass(this.contentClass);
+                return jsonSchemaResolver;
+            default:
+                return null;
+        }
+    }
+
+    public String getMimeType() {
+        return mimeType;
+    }
+
+    public void setMimeType(String mimeType) {
+        this.mimeType = mimeType;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+
+    public String getContentClass() {
+        return contentClass;
+    }
+
+    public void setContentClass(String contentClass) {
+        this.contentClass = contentClass;
+    }
+
+    public String getTargetMimeType() {
+        return targetMimeType;
+    }
+
+    public void setTargetMimeType(String targetMimeType) {
+        this.targetMimeType = targetMimeType;
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java
new file mode 100644
index 00000000000..d7b3819fb1d
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.mongodb;
+
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import com.mongodb.*;
+import com.mongodb.client.*;
+import com.mongodb.connection.ClusterDescription;
+import org.apache.camel.util.function.Suppliers;
+import org.bson.Document;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SslAwareMongoClient implements MongoClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SslAwareMongoClient.class);
+    private static final TrustManager[] trustAllCerts = new TrustManager[] {
+            new X509TrustManager() {
+                public X509Certificate[] getAcceptedIssuers() {
+                    return null;
+                }
+
+                @Override
+                public void checkClientTrusted(X509Certificate[] arg0, String 
arg1)
+                        throws CertificateException {
+                }
+
+                @Override
+                public void checkServerTrusted(X509Certificate[] arg0, String 
arg1)
+                        throws CertificateException {
+                }
+            }
+    };
+    private final Supplier<MongoClient> wrappedMongoClient = 
Suppliers.memorize(new Supplier<MongoClient>() {
+        @Override
+        public MongoClient get() {
+            String credentials = username == null ? "" : username;
+
+            if (!credentials.isEmpty()) {
+                credentials += password == null ? "@" : ":" + password + "@";
+            }
+
+            MongoClientSettings settings = MongoClientSettings.builder()
+                    .applyToSslSettings(builder -> {
+                        builder.enabled(ssl);
+                        if (!sslValidationEnabled) {
+                            builder.invalidHostNameAllowed(true);
+                            SSLContext sc = null;
+                            try {
+                                sc = SSLContext.getInstance("TLSv1.2");
+                            } catch (NoSuchAlgorithmException e) {
+                                throw new RuntimeException("Error 
instantiating trust all SSL context.", e);
+                            }
+                            try {
+                                sc.init(null, trustAllCerts, new 
java.security.SecureRandom());
+                            } catch (KeyManagementException e) {
+                                throw new RuntimeException("Error 
instantiating trust all SSL context.", e);
+                            }
+                            builder.context(sc);
+                        }
+                    })
+                    .applyConnectionString(new 
ConnectionString(String.format("mongodb://%s%s", credentials, hosts)))
+                    .build();
+            LOG.info("Connection created using provided credentials");
+            return MongoClients.create(settings);
+        }
+    });
+    private String hosts = null;
+    private String username = null;
+    private String password = null;
+    private boolean ssl = true;
+
+    private boolean sslValidationEnabled = true;
+
+    public MongoClient getWrappedMongoClient() {
+        return wrappedMongoClient.get();
+    }
+
+    @Override
+    public MongoDatabase getDatabase(String s) {
+        return getWrappedMongoClient().getDatabase(s);
+    }
+
+    @Override
+    public ClientSession startSession() {
+        return getWrappedMongoClient().startSession();
+    }
+
+    @Override
+    public ClientSession startSession(ClientSessionOptions 
clientSessionOptions) {
+        return getWrappedMongoClient().startSession(clientSessionOptions);
+    }
+
+    @Override
+    public void close() {
+        getWrappedMongoClient().close();
+    }
+
+    @Override
+    public MongoIterable<String> listDatabaseNames() {
+        return getWrappedMongoClient().listDatabaseNames();
+    }
+
+    @Override
+    public MongoIterable<String> listDatabaseNames(ClientSession 
clientSession) {
+        return getWrappedMongoClient().listDatabaseNames(clientSession);
+    }
+
+    @Override
+    public ListDatabasesIterable<Document> listDatabases() {
+        return getWrappedMongoClient().listDatabases();
+    }
+
+    @Override
+    public ListDatabasesIterable<Document> listDatabases(ClientSession 
clientSession) {
+        return getWrappedMongoClient().listDatabases(clientSession);
+    }
+
+    @Override
+    public <TResult> ListDatabasesIterable<TResult> 
listDatabases(Class<TResult> aClass) {
+        return getWrappedMongoClient().listDatabases(aClass);
+    }
+
+    @Override
+    public <TResult> ListDatabasesIterable<TResult> 
listDatabases(ClientSession clientSession, Class<TResult> aClass) {
+        return getWrappedMongoClient().listDatabases(clientSession, aClass);
+    }
+
+    @Override
+    public ChangeStreamIterable<Document> watch() {
+        return getWrappedMongoClient().watch();
+    }
+
+    @Override
+    public <TResult> ChangeStreamIterable<TResult> watch(Class<TResult> 
aClass) {
+        return getWrappedMongoClient().watch(aClass);
+    }
+
+    @Override
+    public ChangeStreamIterable<Document> watch(List<? extends Bson> list) {
+        return getWrappedMongoClient().watch(list);
+    }
+
+    @Override
+    public <TResult> ChangeStreamIterable<TResult> watch(List<? extends Bson> 
list, Class<TResult> aClass) {
+        return getWrappedMongoClient().watch(list, aClass);
+    }
+
+    @Override
+    public ChangeStreamIterable<Document> watch(ClientSession clientSession) {
+        return getWrappedMongoClient().watch(clientSession);
+    }
+
+    @Override
+    public <TResult> ChangeStreamIterable<TResult> watch(ClientSession 
clientSession, Class<TResult> aClass) {
+        return getWrappedMongoClient().watch(clientSession, aClass);
+    }
+
+    @Override
+    public ChangeStreamIterable<Document> watch(ClientSession clientSession, 
List<? extends Bson> list) {
+        return getWrappedMongoClient().watch(clientSession, list);
+    }
+
+    @Override
+    public <TResult> ChangeStreamIterable<TResult> watch(
+            ClientSession clientSession, List<? extends Bson> list,
+            Class<TResult> aClass) {
+        return getWrappedMongoClient().watch(clientSession, list, aClass);
+    }
+
+    @Override
+    public ClusterDescription getClusterDescription() {
+        return getWrappedMongoClient().getClusterDescription();
+    }
+
+    @Override
+    public CodecRegistry getCodecRegistry() {
+        return getWrappedMongoClient().getCodecRegistry();
+    }
+
+    @Override
+    public ReadPreference getReadPreference() {
+        return getWrappedMongoClient().getReadPreference();
+    }
+
+    @Override
+    public WriteConcern getWriteConcern() {
+        return getWrappedMongoClient().getWriteConcern();
+    }
+
+    @Override
+    public ReadConcern getReadConcern() {
+        return getWrappedMongoClient().getReadConcern();
+    }
+
+    @Override
+    public Long getTimeout(TimeUnit timeUnit) {
+        return getWrappedMongoClient().getTimeout(timeUnit);
+    }
+
+    @Override
+    public MongoCluster withCodecRegistry(CodecRegistry codecRegistry) {
+        return getWrappedMongoClient().withCodecRegistry(codecRegistry);
+    }
+
+    @Override
+    public MongoCluster withReadPreference(ReadPreference readPreference) {
+        return getWrappedMongoClient().withReadPreference(readPreference);
+    }
+
+    @Override
+    public MongoCluster withWriteConcern(WriteConcern writeConcern) {
+        return getWrappedMongoClient().withWriteConcern(writeConcern);
+    }
+
+    @Override
+    public MongoCluster withReadConcern(ReadConcern readConcern) {
+        return getWrappedMongoClient().withReadConcern(readConcern);
+    }
+
+    @Override
+    public MongoCluster withTimeout(long l, TimeUnit timeUnit) {
+        return getWrappedMongoClient().withTimeout(l, timeUnit);
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public boolean isSsl() {
+        return ssl;
+    }
+
+    public void setSsl(boolean ssl) {
+        this.ssl = ssl;
+    }
+
+    public boolean isSslValidationEnabled() {
+        return sslValidationEnabled;
+    }
+
+    public void setSslValidationEnabled(boolean sslValidationEnabled) {
+        this.sslValidationEnabled = sslValidationEnabled;
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
new file mode 100644
index 00000000000..6e062fc4c27
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.serialization.gson;
+
+import java.lang.reflect.Type;
+import java.time.Instant;
+
+import com.google.gson.*;
+
+public class JavaTimeInstantTypeAdapter implements JsonSerializer<Instant>, 
JsonDeserializer<Instant> {
+
+    @Override
+    public JsonElement serialize(
+            final Instant time, final Type typeOfSrc,
+            final JsonSerializationContext context) {
+        return new JsonPrimitive(time.getEpochSecond() * 1000);
+    }
+
+    @Override
+    public Instant deserialize(
+            final JsonElement json, final Type typeOfT,
+            final JsonDeserializationContext context)
+            throws JsonParseException {
+        return Instant.ofEpochMilli(json.getAsLong());
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
new file mode 100644
index 00000000000..6cb8f2f1b02
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.serialization.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.support.SimpleTypeConverter;
+
+/**
+ * Header deserializer used in Kafka source Kamelet. Automatically converts 
all message headers to String. Uses given
+ * type converter implementation set on the Camel context to convert values. 
If no type converter is set the
+ * deserializer uses its own fallback conversion implementation.
+ */
+public class KafkaHeaderDeserializer implements Processor {
+
+    public boolean enabled = false;
+
+    private final SimpleTypeConverter defaultTypeConverter = new 
SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (!enabled) {
+            return;
+        }
+
+        Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+        TypeConverter typeConverter = exchange.getContext().getTypeConverter();
+        if (typeConverter == null) {
+            typeConverter = defaultTypeConverter;
+        }
+
+        for (Map.Entry<String, Object> header : headers.entrySet()) {
+            if (shouldDeserialize(header)) {
+                header.setValue(typeConverter.convertTo(String.class, 
header.getValue()));
+            }
+        }
+    }
+
+    /**
+     * Fallback conversion strategy supporting null values, String and byte[]. 
Converts headers to respective String
+     * representation or null.
+     *
+     * @param  type     target type, always String in this case.
+     * @param  exchange the exchange containing all headers to convert.
+     * @param  value    the current value to convert.
+     * @return          String representation of given value or null if value 
itself is null.
+     */
+    private static Object convert(Class<?> type, Exchange exchange, Object 
value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        if (value instanceof byte[]) {
+            return new String((byte[]) value, StandardCharsets.UTF_8);
+        }
+
+        return value.toString();
+    }
+
+    /**
+     * Exclude special Kafka headers from auto deserialization.
+     *
+     * @param  entry
+     * @return
+     */
+    private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
+        return !entry.getKey().equals(KafkaConstants.HEADERS) && 
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
+    }
+
+    public void setEnabled(String enabled) {
+        this.enabled = Boolean.parseBoolean(enabled);
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java
new file mode 100644
index 00000000000..7dec28eb05b
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+
+public class DropField implements Processor {
+
+    String field;
+
+    /**
+     * Default constructor.
+     */
+    public DropField() {
+    }
+
+    /**
+     * Constructor using fields.
+     *
+     * @param field the field name to drop.
+     */
+    public DropField(String field, String value) {
+        this.field = field;
+    }
+
+    public void process(Exchange ex) throws InvalidPayloadException {
+        JsonNode body = ex.getMessage().getBody(JsonNode.class);
+        if (body == null) {
+            throw new InvalidPayloadException(ex, JsonNode.class);
+        }
+
+        if (body.getNodeType().equals(JsonNodeType.OBJECT)) {
+            ((ObjectNode) body).remove(field);
+            ex.getMessage().setBody(body);
+        }
+    }
+
+    public void setField(String field) {
+        this.field = field;
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java
new file mode 100644
index 00000000000..eab12202931
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+
+public class ExtractField implements Processor {
+
+    String field;
+    String headerOutputName;
+    boolean headerOutput;
+    boolean strictHeaderCheck;
+    boolean trimField;
+
+    static final String EXTRACTED_FIELD_HEADER = 
"CamelKameletsExtractFieldName";
+
+    /**
+     * Default constructor
+     */
+    public ExtractField() {
+    }
+
+    /**
+     * Constructor using field member.
+     *
+     * @param field the field name to extract.
+     */
+    public ExtractField(String field) {
+        this.field = field;
+    }
+
+    @Override
+    public void process(Exchange ex) throws InvalidPayloadException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+
+        if (jsonNodeBody == null) {
+            throw new InvalidPayloadException(ex, JsonNode.class);
+
+        }
+
+        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new 
TypeReference<Map<Object, Object>>() {
+        });
+        if (!headerOutput || (strictHeaderCheck && checkHeaderExistence(ex))) {
+            ex.getMessage().setBody(body.get(field));
+        } else {
+            extractToHeader(ex, body);
+        }
+        if (trimField) {
+            ex.setProperty("trimField", "true");
+        } else {
+            ex.setProperty("trimField", "false");
+        }
+    }
+
+    private void extractToHeader(Exchange ex, Map<Object, Object> body) {
+        if (headerOutputName == null || headerOutputName.isEmpty() || 
"none".equalsIgnoreCase(headerOutputName)) {
+            ex.getMessage().setHeader(EXTRACTED_FIELD_HEADER, body.get(field));
+        } else {
+            ex.getMessage().setHeader(headerOutputName, body.get(field));
+        }
+    }
+
+    private boolean checkHeaderExistence(Exchange exchange) {
+        if (headerOutputName == null || headerOutputName.isEmpty() || 
"none".equalsIgnoreCase(headerOutputName)) {
+            return 
exchange.getMessage().getHeaders().containsKey(EXTRACTED_FIELD_HEADER);
+        } else {
+            return 
exchange.getMessage().getHeaders().containsKey(headerOutputName);
+        }
+    }
+
+    public void setField(String field) {
+        this.field = field;
+    }
+
+    public void setHeaderOutput(boolean headerOutput) {
+        this.headerOutput = headerOutput;
+    }
+
+    public void setHeaderOutputName(String headerOutputName) {
+        this.headerOutputName = headerOutputName;
+    }
+
+    public void setStrictHeaderCheck(boolean strictHeaderCheck) {
+        this.strictHeaderCheck = strictHeaderCheck;
+    }
+
+    public void setTrimField(boolean trimField) {
+        this.trimField = trimField;
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java
new file mode 100644
index 00000000000..43fee2e79f0
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+
+public class HoistField {
+
+    public JsonNode process(@ExchangeProperty("field") String field, Exchange 
ex) throws InvalidPayloadException {
+        ObjectMapper mapper = new ObjectMapper();
+        Object body = ex.getMessage().getBody();
+        Map<Object, Object> updatedBody = new HashMap<>();
+        updatedBody.put(field, body);
+        return mapper.valueToTree(updatedBody);
+    }
+
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java
new file mode 100644
index 00000000000..0ccf0a2b6fe
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+import org.apache.camel.support.LanguageSupport;
+
+public class InsertField implements Processor {
+
+    String field;
+    String value;
+
+    /**
+     * Default constructor.
+     */
+    public InsertField() {
+    }
+
+    /**
+     * Constructor using fields.
+     *
+     * @param field the field name to insert.
+     * @param value the value of the new field.
+     */
+    public InsertField(String field, String value) {
+        this.field = field;
+        this.value = value;
+    }
+
+    public void process(Exchange ex) throws InvalidPayloadException {
+        JsonNode body = ex.getMessage().getBody(JsonNode.class);
+
+        if (body == null) {
+            throw new InvalidPayloadException(ex, JsonNode.class);
+        }
+
+        String resolvedValue;
+        if (LanguageSupport.hasSimpleFunction(value)) {
+            resolvedValue = 
ex.getContext().resolveLanguage("simple").createExpression(value).evaluate(ex, 
String.class);
+        } else {
+            resolvedValue = value;
+        }
+
+        switch (body.getNodeType()) {
+            case ARRAY:
+                ((ArrayNode) body).add(resolvedValue);
+                break;
+            case OBJECT:
+                ((ObjectNode) body).put(field, resolvedValue);
+                break;
+            default:
+                ((ObjectNode) body).put(field, resolvedValue);
+                break;
+        }
+
+        ex.getMessage().setBody(body);
+    }
+
+    public void setField(String field) {
+        this.field = field;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java
new file mode 100644
index 00000000000..ed83fbea232
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.util.ObjectHelper;
+
+public class MaskField {
+
+    private static final Map<Class<?>, Function<String, ?>> MAPPING_FUNC = new 
HashMap<>();
+    private static final Map<Class<?>, Object> BASIC_MAPPING = new HashMap<>();
+
+    static {
+        BASIC_MAPPING.put(Boolean.class, Boolean.FALSE);
+        BASIC_MAPPING.put(Byte.class, (byte) 0);
+        BASIC_MAPPING.put(Short.class, (short) 0);
+        BASIC_MAPPING.put(Integer.class, 0);
+        BASIC_MAPPING.put(Long.class, 0L);
+        BASIC_MAPPING.put(Float.class, 0f);
+        BASIC_MAPPING.put(Double.class, 0d);
+        BASIC_MAPPING.put(BigInteger.class, BigInteger.ZERO);
+        BASIC_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
+        BASIC_MAPPING.put(Date.class, new Date(0));
+        BASIC_MAPPING.put(String.class, "");
+
+        MAPPING_FUNC.put(Byte.class, Byte::parseByte);
+        MAPPING_FUNC.put(Short.class, Short::parseShort);
+        MAPPING_FUNC.put(Integer.class, Integer::parseInt);
+        MAPPING_FUNC.put(Long.class, Long::parseLong);
+        MAPPING_FUNC.put(Float.class, Float::parseFloat);
+        MAPPING_FUNC.put(Double.class, Double::parseDouble);
+        MAPPING_FUNC.put(String.class, Function.identity());
+        MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new);
+        MAPPING_FUNC.put(BigInteger.class, BigInteger::new);
+    }
+
+    public JsonNode process(
+            @ExchangeProperty("fields") String fields, 
@ExchangeProperty("replacement") String replacement, Exchange ex)
+            throws InvalidPayloadException {
+        ObjectMapper mapper = new ObjectMapper();
+        List<String> splittedFields = new ArrayList<>();
+        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new 
TypeReference<Map<Object, Object>>() {
+        });
+        if (ObjectHelper.isNotEmpty(fields)) {
+            splittedFields = 
Arrays.stream(fields.split(",")).collect(Collectors.toList());
+        }
+
+        Map<Object, Object> updatedBody = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : body.entrySet()) {
+            final String fieldName = (String) entry.getKey();
+            final Object origFieldValue = entry.getValue();
+            updatedBody.put(fieldName,
+                    filterNames(fieldName, splittedFields) ? 
masked(origFieldValue, replacement) : origFieldValue);
+        }
+        if (!updatedBody.isEmpty()) {
+            return mapper.valueToTree(updatedBody);
+        } else {
+            return mapper.valueToTree(body);
+        }
+    }
+
+    boolean filterNames(String fieldName, List<String> splittedFields) {
+        return splittedFields.contains(fieldName);
+    }
+
+    private Object masked(Object value, String replacement) {
+        if (value == null) {
+            return null;
+        }
+        return replacement == null ? maskWithNullValue(value) : 
maskWithCustomReplacement(value, replacement);
+    }
+
+    private static Object maskWithCustomReplacement(Object value, String 
replacement) {
+        Function<String, ?> replacementMapper = 
MAPPING_FUNC.get(value.getClass());
+        if (replacementMapper == null) {
+            throw new IllegalArgumentException(
+                    "Unable to mask value of type " + value.getClass() + " 
with custom replacement.");
+        }
+        try {
+            return replacementMapper.apply(replacement);
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException(
+                    "Unable to convert " + replacement + " (" + 
replacement.getClass() + ") to number", ex);
+        }
+    }
+
+    private static Object maskWithNullValue(Object value) {
+        Object maskedValue = BASIC_MAPPING.get(value.getClass());
+        if (maskedValue == null) {
+            if (value instanceof List)
+                maskedValue = Collections.emptyList();
+            else if (value instanceof Map)
+                maskedValue = Collections.emptyMap();
+            else
+                throw new IllegalArgumentException("Unable to mask value of 
type: " + value.getClass());
+        }
+        return maskedValue;
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
new file mode 100644
index 00000000000..3b1ef2eb3dc
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class MessageTimestampRouter {
+
+    public void process(
+            @ExchangeProperty("topicFormat") String topicFormat, 
@ExchangeProperty("timestampFormat") String timestampFormat,
+            @ExchangeProperty("timestampKeys") String timestampKeys,
+            @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, 
Exchange ex)
+            throws ParseException {
+        final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
+
+        final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", 
Pattern.LITERAL);
+
+        final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
+        fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        ObjectMapper mapper = new ObjectMapper();
+        List<String> splittedKeys = new ArrayList<>();
+        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new 
TypeReference<Map<Object, Object>>() {
+        });
+        if (ObjectHelper.isNotEmpty(timestampKeys)) {
+            splittedKeys = 
Arrays.stream(timestampKeys.split(",")).collect(Collectors.toList());
+        }
+
+        Object rawTimestamp = null;
+        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
+        for (String key : splittedKeys) {
+            if (ObjectHelper.isNotEmpty(key)) {
+                rawTimestamp = body.get(key);
+                break;
+            }
+        }
+        Long timestamp = null;
+        if (ObjectHelper.isNotEmpty(timestampKeyFormat) && 
ObjectHelper.isNotEmpty(rawTimestamp)
+                && !timestampKeyFormat.equalsIgnoreCase("timestamp")) {
+            final SimpleDateFormat timestampKeyFmt = new 
SimpleDateFormat(timestampKeyFormat);
+            timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+            timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
+        } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+            timestamp = Long.parseLong(rawTimestamp.toString());
+        }
+        if (ObjectHelper.isNotEmpty(timestamp)) {
+            final String formattedTimestamp = fmt.format(new Date(timestamp));
+            String replace1;
+            String updatedTopic;
+
+            if (ObjectHelper.isNotEmpty(topicName)) {
+                replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
+                updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            } else {
+                replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
+                updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            }
+            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
+        }
+    }
+
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
new file mode 100644
index 00000000000..517f1d0b787
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class RegexRouter {
+
+    public void process(
+            @ExchangeProperty("regex") String regex, 
@ExchangeProperty("replacement") String replacement, Exchange ex) {
+        Pattern regexPattern = Pattern.compile(regex);
+        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
+        if (ObjectHelper.isNotEmpty(topicName)) {
+            final Matcher matcher = regexPattern.matcher(topicName);
+            if (matcher.matches()) {
+                final String topicUpdated = matcher.replaceFirst(replacement);
+                ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
topicUpdated);
+            }
+        }
+        String ceType = ex.getMessage().getHeader("ce-type", String.class);
+        if (ObjectHelper.isNotEmpty(ceType)) {
+            final Matcher matcher = regexPattern.matcher(ceType);
+            if (matcher.matches()) {
+                final String ceTypeUpdated = matcher.replaceFirst(replacement);
+                ex.getMessage().setHeader("ce-type", ceTypeUpdated);
+            }
+        }
+    }
+
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java
new file mode 100644
index 00000000000..79048041f63
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.util.ObjectHelper;
+
+public class ReplaceField {
+
+    public JsonNode process(
+            @ExchangeProperty("enabled") String enabled, 
@ExchangeProperty("disabled") String disabled,
+            @ExchangeProperty("renames") String renames, Exchange ex)
+            throws InvalidPayloadException {
+        ObjectMapper mapper = new ObjectMapper();
+        List<String> enabledFields = new ArrayList<>();
+        List<String> disabledFields = new ArrayList<>();
+        List<String> renameFields = new ArrayList<>();
+        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new 
TypeReference<Map<Object, Object>>() {
+        });
+        if (ObjectHelper.isNotEmpty(enabled) && 
!enabled.equalsIgnoreCase("all")) {
+            enabledFields = 
Arrays.stream(enabled.split(",")).collect(Collectors.toList());
+        }
+        if (ObjectHelper.isNotEmpty(disabled) && 
!disabled.equalsIgnoreCase("none")) {
+            disabledFields = 
Arrays.stream(disabled.split(",")).collect(Collectors.toList());
+        }
+        if (ObjectHelper.isNotEmpty(disabled)) {
+            renameFields = 
Arrays.stream(renames.split(",")).collect(Collectors.toList());
+        }
+        Map<Object, Object> updatedBody = new HashMap<>();
+
+        if (ObjectHelper.isNotEmpty(renameFields)) {
+            Map<String, String> renamingMap = parseNames(renameFields);
+
+            for (Map.Entry<Object, Object> entry : body.entrySet()) {
+                final String fieldName = (String) entry.getKey();
+                if (filterNames(fieldName, enabledFields, disabledFields)) {
+                    final Object fieldValue = entry.getValue();
+                    updatedBody.put(renameOptional(fieldName, renamingMap), 
fieldValue);
+                }
+            }
+        }
+        if (!updatedBody.isEmpty()) {
+            return mapper.valueToTree(updatedBody);
+        } else {
+            return mapper.valueToTree(body);
+        }
+    }
+
+    boolean filterNames(String fieldName, List<String> enabledFields, 
List<String> disabledFields) {
+        return !disabledFields.contains(fieldName) && (enabledFields.isEmpty() 
|| enabledFields.contains(fieldName));
+    }
+
+    static Map<String, String> parseNames(List<String> mappings) {
+        final Map<String, String> m = new HashMap<>();
+        for (String mapping : mappings) {
+            final String[] parts = mapping.split(":");
+            m.put(parts[0], parts[1]);
+        }
+        return m;
+    }
+
+    String renameOptional(String fieldName, Map<String, String> renames) {
+        final String mapping = renames.get(fieldName);
+        return mapping == null ? fieldName : mapping;
+    }
+
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
new file mode 100644
index 00000000000..9c660497f0e
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class TimestampRouter {
+
+    public void process(
+            @ExchangeProperty("topicFormat") String topicFormat, 
@ExchangeProperty("timestampFormat") String timestampFormat,
+            @ExchangeProperty("timestampHeaderName") String 
timestampHeaderName, Exchange ex) {
+        final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
+
+        final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", 
Pattern.LITERAL);
+
+        final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
+        fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        Long timestamp = null;
+        String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
+        Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
+        if (rawTimestamp instanceof Long) {
+            timestamp = (Long) rawTimestamp;
+        } else if (rawTimestamp instanceof Instant) {
+            timestamp = ((Instant) rawTimestamp).toEpochMilli();
+        } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+            timestamp = Long.parseLong(rawTimestamp.toString());
+        }
+        if (ObjectHelper.isNotEmpty(timestamp)) {
+            final String formattedTimestamp = fmt.format(new Date(timestamp));
+            String replace1;
+            String updatedTopic;
+
+            if (ObjectHelper.isNotEmpty(topicName)) {
+                replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
+                updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            } else {
+                replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
+                updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            }
+            ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
+        }
+    }
+
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
new file mode 100644
index 00000000000..629f4ebadb9
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.transform.aws2.ddb;
+
+import java.time.Instant;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.camel.Message;
+import 
org.apache.camel.component.kamelet.utils.serialization.gson.JavaTimeInstantTypeAdapter;
+import org.apache.camel.spi.DataType;
+import org.apache.camel.spi.DataTypeTransformer;
+import org.apache.camel.spi.Transformer;
+
+@DataTypeTransformer(name = "aws2-ddb:application-x-struct",
+                     description = "Transforms DynamoDB record into a Json 
node")
+public class Ddb2JsonStructDataTypeTransformer extends Transformer {
+
+    private final Gson gson = new GsonBuilder()
+            .registerTypeAdapter(Instant.class, new 
JavaTimeInstantTypeAdapter())
+            .create();
+
+    @Override
+    public void transform(Message message, DataType fromType, DataType toType) 
{
+        if (message.getBody() instanceof String) {
+            return;
+        }
+
+        message.setBody(gson.toJson(message.getBody()));
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
new file mode 100644
index 00000000000..8b97d3b9ff1
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform.kafka;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+
+public class BatchManualCommit implements Processor {
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        List<?> exchanges = exchange.getMessage().getBody(List.class);
+        if (exchanges.size() > 0) {
+            final Object tmp = exchanges.get(exchanges.size() - 1);
+            if (tmp instanceof Exchange element) {
+                KafkaManualCommit manual
+                        = 
element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+                if (manual != null) {
+                    manual.commit();
+                }
+            }
+        }
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
new file mode 100644
index 00000000000..db92916f2b9
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform.kafka;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+
+public class ManualCommit implements Processor {
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        KafkaManualCommit manual = 
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        if (manual != null) {
+            manual.commit();
+        }
+    }
+}
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
new file mode 100644
index 00000000000..8333ab25cf6
--- /dev/null
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform.kafka;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class ValueToKey {
+
+    public void process(@ExchangeProperty("fields") String fields, Exchange 
ex) throws InvalidPayloadException {
+        List<String> splittedFields = new ArrayList<>();
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+        Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new 
TypeReference<Map<Object, Object>>() {
+        });
+        if (ObjectHelper.isNotEmpty(fields)) {
+            splittedFields = 
Arrays.stream(fields.split(",")).collect(Collectors.toList());
+        }
+        Map<Object, Object> key = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : body.entrySet()) {
+            final String fieldName = (String) entry.getKey();
+            if (filterNames(fieldName, splittedFields)) {
+                final Object fieldValue = entry.getValue();
+                key.put(entry.getKey(), fieldValue);
+            }
+        }
+
+        ex.getMessage().setHeader(KafkaConstants.KEY, key);
+    }
+
+    boolean filterNames(String fieldName, List<String> splittedFields) {
+        return splittedFields.contains(fieldName);
+    }
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java
new file mode 100644
index 00000000000..9332887679b
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import 
org.apache.camel.component.kamelet.utils.serialization.kafka.KafkaHeaderDeserializer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class KafkaHeaderDeserializerTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final KafkaHeaderDeserializer processor = new 
KafkaHeaderDeserializer();
+
+    @BeforeEach
+    void setup() {
+        this.camelContext = new DefaultCamelContext();
+    }
+
+    @Test
+    void shouldDeserializeHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", 
"barBytes".getBytes(StandardCharsets.UTF_8));
+        exchange.getMessage().setHeader("fooNull", null);
+        exchange.getMessage().setHeader("number", 1L);
+
+        processor.enabled = true;
+        processor.process(exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("barBytes", 
exchange.getMessage().getHeader("fooBytes"));
+        
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull"));
+        Assertions.assertNull(exchange.getMessage().getHeader("fooNull"));
+        Assertions.assertEquals("1", 
exchange.getMessage().getHeader("number"));
+    }
+
+    @Test
+    void shouldNotDeserializeHeadersWhenDisabled() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", 
"barBytes".getBytes(StandardCharsets.UTF_8));
+
+        processor.enabled = false;
+        processor.process(exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes") 
instanceof byte[]);
+        
Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)),
+                Arrays.toString((byte[]) 
exchange.getMessage().getHeader("fooBytes")));
+    }
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java
new file mode 100644
index 00000000000..94608d554f1
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ExtractFieldTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private ExtractField processor;
+
+    private final String baseJson = "{" + "\n" +
+                                    "  \"name\" : \"Rajesh Koothrappali\"" + 
"\n" +
+                                    "}";
+
+    @BeforeEach
+    void setup() {
+        camelContext = new DefaultCamelContext();
+        processor = new ExtractField();
+    }
+
+    @Test
+    void shouldExtractFieldFromJsonNode() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor.setField("name");
+        processor.process(exchange);
+
+        Assertions.assertEquals("Rajesh Koothrappali", 
exchange.getMessage().getBody(String.class));
+    }
+
+    @Test
+    void shouldExtractFieldToHeader() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor.setField("name");
+        processor.setHeaderOutput(true);
+        processor.setHeaderOutputName("name");
+        processor.process(exchange);
+
+        Assertions.assertEquals(baseJson, 
exchange.getMessage().getBody(String.class));
+        Assertions.assertEquals("Rajesh Koothrappali", 
exchange.getMessage().getHeader("name"));
+    }
+
+    @Test
+    void shouldExtractFieldToHeaderWithStrictHeaderCheck() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor.setField("name");
+        processor.setHeaderOutput(true);
+        processor.setHeaderOutputName("name");
+        processor.setStrictHeaderCheck(true);
+        processor.process(exchange);
+
+        Assertions.assertEquals(baseJson, 
exchange.getMessage().getBody(String.class));
+        Assertions.assertEquals("Rajesh Koothrappali", 
exchange.getMessage().getHeader("name"));
+
+        exchange.getMessage().setHeader("name", "somethingElse");
+
+        processor.process(exchange);
+
+        Assertions.assertEquals("Rajesh Koothrappali", 
exchange.getMessage().getBody(String.class));
+        Assertions.assertEquals("somethingElse", 
exchange.getMessage().getHeader("name"));
+    }
+
+    @Test
+    void shouldExtractFieldToDefaultHeader() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor.setField("name");
+        processor.setHeaderOutput(true);
+        processor.process(exchange);
+
+        Assertions.assertEquals(baseJson, 
exchange.getMessage().getBody(String.class));
+        Assertions.assertEquals("Rajesh Koothrappali", 
exchange.getMessage().getHeader(ExtractField.EXTRACTED_FIELD_HEADER));
+
+        exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor.setHeaderOutputName("none");
+        processor.process(exchange);
+
+        Assertions.assertEquals(baseJson, 
exchange.getMessage().getBody(String.class));
+        Assertions.assertEquals("Rajesh Koothrappali", 
exchange.getMessage().getHeader(ExtractField.EXTRACTED_FIELD_HEADER));
+    }
+
+    @Test
+    void shouldExtractFieldWithT() throws Exception {
+        final String baseJson = 
"{\"id\":\"1\",\"message\":\"Camel\\\\tRocks\"}";
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor.setField("message");
+        processor.setTrimField(true);
+        processor.process(exchange);
+
+        Assertions.assertEquals("Camel\\tRocks", 
exchange.getMessage().getBody());
+    }
+
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java
new file mode 100644
index 00000000000..226328d3367
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class HoistFieldTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private HoistField processor;
+
+    private final String baseJson = "{" + "\n" +
+                                    "  \"name\" : \"Rajesh Koothrappali\"" + 
"\n" +
+                                    "}";
+
+    @BeforeEach
+    void setup() {
+        camelContext = new DefaultCamelContext();
+        processor = new HoistField();
+    }
+
+    @Test
+    void shouldHoistField() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode s = processor.process("element", exchange);
+        Assertions.assertEquals("{" + "\"element\"" + ":" + "{" +
+                                "\"name\":\"Rajesh Koothrappali\"" +
+                                "}" + "}",
+                s.toString());
+    }
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java
new file mode 100644
index 00000000000..c1da17929cc
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class InsertFieldTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private InsertField processor;
+
+    private final String baseJson = "{" +
+                                    "\"name\":\"Rajesh Koothrappali\"" +
+                                    "}";
+
+    @BeforeEach
+    void setup() {
+        camelContext = new DefaultCamelContext();
+        processor = new InsertField();
+    }
+
+    @Test
+    void shouldAddFieldToPlainJson() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        processor = new InsertField("age", "29");
+        processor.process(exchange);
+
+        Assertions.assertEquals(exchange.getMessage().getBody(String.class), 
"{" + "\n" +
+                                                                             " 
 \"name\" : \"Rajesh Koothrappali\"," + "\n" +
+                                                                             " 
 \"age\" : \"29\"" + "\n" +
+                                                                             
"}");
+    }
+
+    @Test
+    void shouldAddFieldToArrayJson() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        String arrayJson = "[\"batman\",\"spiderman\",\"wonderwoman\"]";
+        exchange.getMessage().setBody(mapper.readTree(arrayJson));
+
+        processor.setValue("green lantern");
+        processor.process(exchange);
+
+        Assertions.assertEquals(exchange.getMessage().getBody(String.class),
+                "[ \"batman\", \"spiderman\", \"wonderwoman\", \"green 
lantern\" ]");
+    }
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java
new file mode 100644
index 00000000000..18972e635ea
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class MaskFieldTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private MaskField processor;
+
+    private final String baseJson = "{" + "\n" +
+                                    "  \"name\" : \"Rajesh Koothrappali\"" + 
"\n" +
+                                    "}";
+
+    @BeforeEach
+    void setup() {
+        camelContext = new DefaultCamelContext();
+        processor = new MaskField();
+    }
+
+    @Test
+    void shouldMaskField() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode s = processor.process("name", "xxxx", exchange);
+        Assertions.assertEquals("\"xxxx\"", s.get("name").toString());
+    }
+
+    @Test
+    void shouldMaskFieldWithNull() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode s = processor.process("name", null, exchange);
+        Assertions.assertEquals("\"\"", s.get("name").toString());
+    }
+
+    @Test
+    void shouldMaskFieldList() throws Exception {
+        Map<String, List<String>> names = new HashMap<>();
+        Exchange exchange = new DefaultExchange(camelContext);
+        List<String> els = new ArrayList<>();
+        els.add("Sheldon");
+        els.add("Rajesh");
+        els.add("Leonard");
+        names.put("names", els);
+
+        exchange.getMessage().setBody(mapper.writeValueAsString(names));
+
+        JsonNode s = processor.process("names", null, exchange);
+        Assertions.assertEquals("[]", s.get("names").toString());
+    }
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
new file mode 100644
index 00000000000..c1837a86718
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class RegexRouterTest {
+
+    private DefaultCamelContext camelContext;
+
+    private RegexRouter processor;
+
+    private final String topic = "hello";
+
+    @BeforeEach
+    void setup() {
+        camelContext = new DefaultCamelContext();
+        processor = new RegexRouter();
+    }
+
+    @Test
+    void shouldReplaceFieldToPlainJson() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic);
+
+        processor.process(".*ll.*", "newTopic", exchange);
+
+        Assertions.assertEquals("newTopic", 
exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC));
+    }
+}
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java
new file mode 100644
index 00000000000..2e6e4ebfcc4
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ReplaceFieldTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private ReplaceField processor;
+
+    private final String baseJson = "{" + "\n" +
+                                    "  \"name\" : \"Rajesh Koothrappali\"," + 
"\n" +
+                                    "  \"age\" : \"29\"" + "\n" +
+                                    "}";
+
+    @BeforeEach
+    void setup() {
+        camelContext = new DefaultCamelContext();
+        processor = new ReplaceField();
+    }
+
+    @Test
+    void shouldReplaceFieldToPlainJson() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode node = processor.process("all", "none", 
"name:firstName,age:years", exchange);
+
+        Assertions.assertEquals(node.toString(), "{" +
+                                                 "\"firstName\":\"Rajesh 
Koothrappali\"," +
+                                                 "\"years\":\"29\"" +
+                                                 "}");
+    }
+
+    @Test
+    void shouldReplaceFieldWithSpecificRename() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode node = processor.process("name,age", "none", 
"name:firstName", exchange);
+
+        Assertions.assertEquals(node.toString(), "{" +
+                                                 "\"firstName\":\"Rajesh 
Koothrappali\"," +
+                                                 "\"age\":\"29\"" +
+                                                 "}");
+    }
+
+    @Test
+    void shouldReplaceFieldWithSpecificRenameAndDisableFields() throws 
Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode node = processor.process("name", "none", "name:firstName", 
exchange);
+
+        Assertions.assertEquals(node.toString(), "{" +
+                                                 "\"firstName\":\"Rajesh 
Koothrappali\"" +
+                                                 "}");
+    }
+
+    @Test
+    void shouldReplaceFieldWithSpecificDisableFields() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode node = processor.process("all", "name,age", "name:firstName", 
exchange);
+
+        Assertions.assertEquals(node.toString(), "{" +
+                                                 "\"name\":\"Rajesh 
Koothrappali\"," +
+                                                 "\"age\":\"29\"" +
+                                                 "}");
+    }
+
+    @Test
+    void shouldReplaceFieldWithDisableAllFields() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+        JsonNode node = processor.process("none", "all", "name:firstName", 
exchange);
+
+        Assertions.assertEquals(node.toString(), "{" +
+                                                 "\"name\":\"Rajesh 
Koothrappali\"," +
+                                                 "\"age\":\"29\"" +
+                                                 "}");
+    }
+}

Reply via email to