This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit 4595e5f2e64ac4b0ad1113867bfd18b9142285cf
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Mon Nov 14 13:15:00 2022 +0100

    Introduce Kamelet input/output data types
    
    - Introduce data type converters
    - Add data type processor to auto convert exchange message from/to given 
data type
    - Let user choose which data type to use (via Kamelet property)
    - Add data type registry and annotation based loader to find data type 
implementations by component scheme and name
    
    Relates to CAMEL-18698 and apache/camel-k#1980
---
 .github/workflows/yaks-tests.yaml                  |   3 +-
 kamelets/aws-ddb-sink.kamelet.yaml                 |  27 +++-
 kamelets/aws-s3-source.kamelet.yaml                |  17 +++
 library/camel-kamelets-utils/pom.xml               |   7 +-
 .../utils/format/AnnotationDataTypeLoader.java     | 152 ++++++++++++++++++++
 .../kamelets/utils/format/DataTypeProcessor.java   |  67 +++++++++
 .../utils/format/DefaultDataTypeConverter.java     |  54 ++++++++
 .../utils/format/DefaultDataTypeRegistry.java      | 154 +++++++++++++++++++++
 .../converter/aws2/ddb/Ddb2JsonInputType.java}     |  87 ++++++++----
 .../converter/aws2/s3/AWS2S3BinaryOutputType.java  |  55 ++++++++
 .../converter/aws2/s3/AWS2S3JsonOutputType.java    |  63 +++++++++
 .../converter/standard/JsonModelDataType.java      |  66 +++++++++
 .../utils/format/spi/DataTypeConverter.java        |  39 ++++++
 .../kamelets/utils/format/spi/DataTypeLoader.java  |  31 +++++
 .../utils/format/spi/DataTypeRegistry.java         |  60 ++++++++
 .../utils/format/spi/annotations/DataType.java     |  51 +++++++
 .../META-INF/services/org/apache/camel/DataType    |  20 +++
 .../utils/format/DefaultDataTypeRegistryTest.java  |  57 ++++++++
 .../converter/aws2/ddb/Ddb2JsonInputTypeTest.java} | 104 +++++++++-----
 .../aws2/s3/AWS2S3JsonOutputTypeTest.java          |  98 +++++++++++++
 .../converter/standard/JsonModelDataTypeTest.java  |  84 +++++++++++
 .../src/test/resources/log4j2-test.xml             |  32 +++++
 .../resources/kamelets/aws-ddb-sink.kamelet.yaml   |  27 +++-
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |  17 +++
 test/aws-s3/README.md                              |  76 ++++++++++
 test/aws-s3/amazonS3Client.groovy                  |  36 +++++
 test/aws-s3/aws-s3-credentials.properties          |   7 +
 test/aws-s3/aws-s3-inmem-binding.feature           |  49 +++++++
 test/aws-s3/aws-s3-source-property-conf.feature    |  37 +++++
 test/aws-s3/aws-s3-source-secret-conf.feature      |  39 ++++++
 test/aws-s3/aws-s3-source-uri-conf.feature         |  32 +++++
 test/aws-s3/aws-s3-to-inmem.yaml                   |  39 ++++++
 test/aws-s3/aws-s3-to-log-secret-based.groovy      |  21 +++
 test/aws-s3/aws-s3-to-log-uri-based.groovy         |  29 ++++
 test/aws-s3/aws-s3-uri-binding.feature             |  35 +++++
 test/aws-s3/aws-s3-uri-binding.yaml                |  37 +++++
 test/aws-s3/yaks-config.yaml                       |  65 +++++++++
 test/utils/inmem-to-log.yaml                       |  29 ++++
 38 files changed, 1829 insertions(+), 74 deletions(-)

diff --git a/.github/workflows/yaks-tests.yaml 
b/.github/workflows/yaks-tests.yaml
index 46acc626..defc5733 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -43,7 +43,7 @@ concurrency:
 env:
   CAMEL_K_VERSION: 1.10.3
   YAKS_VERSION: 0.11.0
-  YAKS_IMAGE_NAME: "docker.io/yaks/yaks"
+  YAKS_IMAGE_NAME: "docker.io/citrusframework/yaks"
   YAKS_RUN_OPTIONS: "--timeout=15m"
 
 jobs:
@@ -110,6 +110,7 @@ jobs:
       run: |
         echo "Running tests"
         yaks run test/aws-ddb-sink $YAKS_RUN_OPTIONS
+        yaks run test/aws-s3 $YAKS_RUN_OPTIONS
         yaks run test/extract-field-action $YAKS_RUN_OPTIONS
         yaks run test/insert-field-action $YAKS_RUN_OPTIONS
         yaks run test/mail-sink $YAKS_RUN_OPTIONS
diff --git a/kamelets/aws-ddb-sink.kamelet.yaml 
b/kamelets/aws-ddb-sink.kamelet.yaml
index 5b603abf..ba200347 100644
--- a/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/kamelets/aws-ddb-sink.kamelet.yaml
@@ -97,6 +97,12 @@ spec:
         x-descriptors:
           - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
+      inputFormat:
+        title: Input Type
+        description: Specify the input type for this Kamelet. The Kamelet will 
automatically apply conversion logic in order to transform message content to 
this data type.
+        type: string
+        default: json
+        example: json
   types:
     in:
       mediaType: application/json
@@ -107,17 +113,24 @@ spec:
   - "camel:aws2-ddb"
   - "camel:kamelet"
   template:
+    beans:
+    - name: dataTypeRegistry
+      type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
+    - name: inputTypeProcessor
+      type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
+      property:
+        - key: scheme
+          value: 'aws2-ddb'
+        - key: format
+          value: '{{inputFormat}}'
     from:
       uri: "kamelet:source"
       steps:
       - set-property:
-          name: operation
-          constant: "{{operation}}"
-      - unmarshal:
-          json:
-            library: Jackson
-            unmarshalType: com.fasterxml.jackson.databind.JsonNode
-      - bean: 
"org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
+        name: operation
+        constant: "{{operation}}"
+      - process:
+          ref: "{{inputTypeProcessor}}"
       - to:
           uri: "aws2-ddb:{{table}}"
           parameters:
diff --git a/kamelets/aws-s3-source.kamelet.yaml 
b/kamelets/aws-s3-source.kamelet.yaml
index 6ab2bca4..e09cf4aa 100644
--- a/kamelets/aws-s3-source.kamelet.yaml
+++ b/kamelets/aws-s3-source.kamelet.yaml
@@ -107,6 +107,12 @@ spec:
         description: The number of milliseconds before the next poll of the 
selected bucket.
         type: integer
         default: 500
+      outputFormat:
+        title: Output Type
+        description: Choose the output type for this Kamelet. The Kamelet 
supports different output types and performs automatic message conversion 
according to this data type.
+        type: string
+        default: binary
+        example: binary
   dependencies:
     - "camel:core"
     - "camel:aws2-s3"
@@ -114,6 +120,15 @@ spec:
     - "camel:kamelet"
   template:
     beans:
+      - name: dataTypeRegistry
+        type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
+      - name: outputTypeProcessor
+        type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
+        property:
+          - key: scheme
+            value: 'aws2-s3'
+          - key: format
+            value: '{{outputFormat}}'
       - name: renameHeaders
         type: 
"#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
@@ -143,4 +158,6 @@ spec:
       steps:
       - process:
           ref: "{{renameHeaders}}"
+      - process:
+          ref: "{{outputTypeProcessor}}"
       - to: "kamelet:sink"
diff --git a/library/camel-kamelets-utils/pom.xml 
b/library/camel-kamelets-utils/pom.xml
index 4f848d36..5b1441f3 100644
--- a/library/camel-kamelets-utils/pom.xml
+++ b/library/camel-kamelets-utils/pom.xml
@@ -71,12 +71,17 @@
             <artifactId>camel-kafka</artifactId>
         </dependency>
 
-        <!-- AWS Dynamo DB camel component -->
+        <!-- Optional dependencies for data type conversion -->
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-ddb</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-s3</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Test scoped dependencies -->
         <dependency>
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
new file mode 100644
index 00000000..96ca50eb
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.camel.TypeConverterLoaderException;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import org.apache.camel.spi.Injector;
+import org.apache.camel.spi.PackageScanClassResolver;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Data type loader scans packages for {@link DataTypeConverter} classes 
annotated with {@link DataType} annotation.
+ */
+public class AnnotationDataTypeLoader implements DataTypeLoader {
+
+    public static final String META_INF_SERVICES = 
"META-INF/services/org/apache/camel/DataType";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AnnotationDataTypeLoader.class);
+
+    protected final PackageScanClassResolver resolver;
+    protected final Injector injector;
+
+    protected Set<Class<?>> visitedClasses = new HashSet<>();
+    protected Set<String> visitedURIs = new HashSet<>();
+
+    public AnnotationDataTypeLoader(Injector injector, 
PackageScanClassResolver resolver) {
+        this.injector = injector;
+        this.resolver = resolver;
+    }
+
+    @Override
+    public void load(DataTypeRegistry registry) {
+        Set<String> packages = new HashSet<>();
+
+        LOG.trace("Searching for {} services", META_INF_SERVICES);
+        try {
+            ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+            if (ccl != null) {
+                findPackages(packages, ccl);
+            }
+            findPackages(packages, getClass().getClassLoader());
+            if (packages.isEmpty()) {
+                LOG.debug("No package names found to be used for classpath 
scanning for annotated data types.");
+                return;
+            }
+        } catch (Exception e) {
+            throw new TypeConverterLoaderException(
+                    "Cannot find package names to be used for classpath 
scanning for annotated data types.", e);
+        }
+
+        // if there is any packages to scan and load @DataType classes, then 
do it
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Found data type packages to scan: {}", String.join(", 
", packages));
+        }
+        Set<Class<?>> scannedClasses = resolver.findAnnotated(DataType.class, 
packages.toArray(new String[]{}));
+        if (!scannedClasses.isEmpty()) {
+            LOG.debug("Found {} packages with {} @DataType classes to load", 
packages.size(), scannedClasses.size());
+
+            // load all the found classes into the type data type registry
+            for (Class<?> type : scannedClasses) {
+                if (acceptClass(type)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Loading data type annotation: {}", 
ObjectHelper.name(type));
+                    }
+                    loadDataType(registry, type);
+                }
+            }
+        }
+
+        // now clear the maps so we do not hold references
+        visitedClasses.clear();
+        visitedURIs.clear();
+    }
+
+    private void loadDataType(DataTypeRegistry registry, Class<?> type) {
+        if (visitedClasses.contains(type)) {
+            return;
+        }
+        visitedClasses.add(type);
+
+        try {
+            if (DataTypeConverter.class.isAssignableFrom(type) && 
type.isAnnotationPresent(DataType.class)) {
+                DataType dt = type.getAnnotation(DataType.class);
+                DataTypeConverter converter = (DataTypeConverter) 
injector.newInstance(type);
+                registry.addDataTypeConverter(dt.scheme(), converter);
+            }
+        } catch (NoClassDefFoundError e) {
+            LOG.debug("Ignoring converter type: {} as a dependent class could 
not be found: {}",
+                    type.getCanonicalName(), e, e);
+        }
+    }
+
+    protected boolean acceptClass(Class<?> type) {
+        return true;
+    }
+
+    protected void findPackages(Set<String> packages, ClassLoader classLoader) 
throws IOException {
+        Enumeration<URL> resources = 
classLoader.getResources(META_INF_SERVICES);
+        while (resources.hasMoreElements()) {
+            URL url = resources.nextElement();
+            String path = url.getPath();
+            if (!visitedURIs.contains(path)) {
+                // remember we have visited this uri so we wont read it twice
+                visitedURIs.add(path);
+                LOG.debug("Loading file {} to retrieve list of packages, from 
url: {}", META_INF_SERVICES, url);
+                try (BufferedReader reader = IOHelper.buffered(new 
InputStreamReader(url.openStream(), StandardCharsets.UTF_8))) {
+                    while (true) {
+                        String line = reader.readLine();
+                        if (line == null) {
+                            break;
+                        }
+                        line = line.trim();
+                        if (line.startsWith("#") || line.length() == 0) {
+                            continue;
+                        }
+                        packages.add(line);
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
new file mode 100644
index 00000000..859269fe
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import org.apache.camel.BeanInject;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * Processor applies data type conversion based on given format name. Searches 
for matching data type converter
+ * with given component scheme and format name.
+ */
+public class DataTypeProcessor implements Processor, CamelContextAware {
+
+    private CamelContext camelContext;
+
+    @BeanInject
+    private DefaultDataTypeRegistry dataTypeRegistry;
+
+    private String scheme;
+    private String format;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (format == null || format.isEmpty()) {
+            return;
+        }
+
+        dataTypeRegistry.lookup(scheme, format)
+                        .ifPresent(converter -> converter.convert(exchange));
+    }
+
+    public void setFormat(String format) {
+        this.format = format;
+    }
+
+    public void setScheme(String scheme) {
+        this.scheme = scheme;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java
new file mode 100644
index 00000000..11680b50
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+
+/**
+ * Default data type converter receives a name and a target type in order to 
use traditional exchange body conversion
+ * mechanisms in order to transform the message body to a given type.
+ */
+public class DefaultDataTypeConverter implements DataTypeConverter {
+
+    private final String name;
+    private final Class<?> type;
+
+    public DefaultDataTypeConverter(String name, Class<?> type) {
+        this.name = name;
+        this.type = type;
+    }
+
+    @Override
+    public void convert(Exchange exchange) {
+        if (type.isInstance(exchange.getMessage().getBody())) {
+            return;
+        }
+
+        exchange.getMessage().setBody(exchange.getMessage().getBody(type));
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    public Class<?> getType() {
+        return type;
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
new file mode 100644
index 00000000..e7c6e3e8
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.engine.DefaultInjector;
+import org.apache.camel.impl.engine.DefaultPackageScanClassResolver;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry;
+import org.apache.camel.spi.PackageScanClassResolver;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * Default data type registry able to resolve data types converters in the 
project. Data types may be defined at the component level
+ * via {@link org.apache.camel.kamelets.utils.format.spi.annotations.DataType} 
annotations. Also, users can add data types directly
+ * to the Camel context or manually to the registry.
+ *
+ * The registry is able to retrieve converters for a given data type based on 
the component scheme and the given data type name.
+ */
+public class DefaultDataTypeRegistry extends ServiceSupport implements 
DataTypeRegistry, CamelContextAware {
+
+    private CamelContext camelContext;
+
+    private PackageScanClassResolver resolver;
+
+    protected final List<DataTypeLoader> dataTypeLoaders = new ArrayList<>();
+
+    private final Map<String, List<DataTypeConverter>> dataTypeConverters = 
new HashMap<>();
+
+    @Override
+    public void addDataTypeConverter(String scheme, DataTypeConverter 
converter) {
+        this.getComponentDataTypeConverters(scheme).add(converter);
+    }
+
+    @Override
+    public Optional<DataTypeConverter> lookup(String scheme, String name) {
+        if (dataTypeLoaders.isEmpty()) {
+            try {
+                doInit();
+            } catch (Exception e) {
+                throw new RuntimeCamelException("Failed to initialize data 
type registry", e);
+            }
+        }
+
+        if (name == null) {
+            return Optional.empty();
+        }
+
+        Optional<DataTypeConverter> componentDataTypeConverter = 
getComponentDataTypeConverters(scheme).stream()
+                .filter(dtc -> name.equals(dtc.getName()))
+                .findFirst();
+
+        if (componentDataTypeConverter.isPresent()) {
+            return componentDataTypeConverter;
+        }
+
+        return getDefaultDataTypeConverter(name);
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+
+        if (resolver == null) {
+            if (camelContext != null) {
+                resolver = 
camelContext.adapt(ExtendedCamelContext.class).getPackageScanClassResolver();
+            } else {
+                resolver = new DefaultPackageScanClassResolver();
+            }
+        }
+
+        dataTypeLoaders.add(new AnnotationDataTypeLoader(new 
DefaultInjector(camelContext), resolver));
+
+        addDataTypeConverter(new DefaultDataTypeConverter("string", 
String.class));
+        addDataTypeConverter(new DefaultDataTypeConverter("binary", 
byte[].class));
+
+        for (DataTypeLoader loader : dataTypeLoaders) {
+            CamelContextAware.trySetCamelContext(loader, getCamelContext());
+            loader.load(this);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        this.dataTypeConverters.clear();
+    }
+
+    /**
+     * Retrieve default data output type from Camel context for given format 
name.
+     * @param name
+     * @return
+     */
+    private Optional<DataTypeConverter> getDefaultDataTypeConverter(String 
name) {
+        Optional<DataTypeConverter> dataTypeConverter = 
getComponentDataTypeConverters("camel").stream()
+                .filter(dtc -> name.equals(dtc.getName()))
+                .findFirst();
+
+        if (dataTypeConverter.isPresent()) {
+            return dataTypeConverter;
+        }
+
+        return 
Optional.ofNullable(camelContext.getRegistry().lookupByNameAndType(name, 
DataTypeConverter.class));
+    }
+
+    /**
+     * Retrieve list of data types defined on the component level for given 
scheme.
+     * @param scheme
+     * @return
+     */
+    private List<DataTypeConverter> getComponentDataTypeConverters(String 
scheme) {
+        if (!dataTypeConverters.containsKey(scheme)) {
+            dataTypeConverters.put(scheme, new ArrayList<>());
+        }
+
+        return dataTypeConverters.get(scheme);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
similarity index 69%
rename from 
library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
rename to 
library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
index c5098c1c..a15ff3a0 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
@@ -14,22 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.kamelets.utils.transform.aws.ddb;
 
+package org.apache.camel.kamelets.utils.format.converter.aws2.ddb;
+
+import java.io.InputStream;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.component.aws2.ddb.Ddb2Constants;
 import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
@@ -40,55 +45,78 @@ import 
software.amazon.awssdk.services.dynamodb.model.ReturnValue;
  *
  * Json property names map to attribute keys and Json property values map to 
attribute values.
  *
- * During mapping the Json property types resolve to the respective attribute 
types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}).
- * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code 
NumberSet} attribute values.
+ * During mapping the Json property types resolve to the respective attribute 
types
+ * ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). 
Primitive typed arrays in Json get mapped to
+ * {@code StringSet} or {@code NumberSet} attribute values.
+ *
+ * The input type supports the operations: PutItem, UpdateItem, DeleteItem
  *
  * For PutItem operation the Json body defines all item attributes.
  *
  * For DeleteItem operation the Json body defines only the primary key 
attributes that identify the item to delete.
  *
- * For UpdateItem operation the Json body defines both key attributes to 
identify the item to be updated and all item attributes tht get updated on the 
item.
+ * For UpdateItem operation the Json body defines both key attributes to 
identify the item to be updated and all item
+ * attributes tht get updated on the item.
+ *
+ * The given Json body can use "operation", "key" and "item" as top level 
properties. Both define a Json object that
+ * will be mapped to respective attribute value maps:
  *
- * The given Json body can use "key" and "item" as top level properties.
- * Both define a Json object that will be mapped to respective attribute value 
maps:
- * <pre>{@code
+ * <pre>
+ * {@code
  * {
+ *   "operation": "PutItem"
  *   "key": {},
  *   "item": {}
  * }
  * }
  * </pre>
- * The converter will extract the objects and set respective attribute value 
maps as header entries.
- * This is a comfortable way to define different key and item attribute value 
maps e.g. on UpdateItem operation.
  *
- * In case key and item attribute value maps are identical you can omit the 
special top level properties completely.
- * The converter will map the whole Json body as is then and use it as source 
for the attribute value map.
+ * The converter will extract the objects and set respective attribute value 
maps as header entries. This is a
+ * comfortable way to define different key and item attribute value maps e.g. 
on UpdateItem operation.
+ *
+ * In case key and item attribute value maps are identical you can omit the 
special top level properties completely. The
+ * converter will map the whole Json body as is then and use it as source for 
the attribute value map.
  */
-public class JsonToDdbModelConverter {
+@DataType(scheme = "aws2-ddb", name = "json")
+public class Ddb2JsonInputType implements DataTypeConverter {
+
+    private final JacksonDataFormat dataFormat = new JacksonDataFormat(new 
ObjectMapper(), JsonNode.class);
 
-    public String process(@ExchangeProperty("operation") String operation, 
Exchange exchange) throws InvalidPayloadException {
+    @Override
+    public void convert(Exchange exchange) {
         if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) 
||
                 
exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) {
-            return "";
+            return;
         }
 
-        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonBody = getBodyAsJsonNode(exchange);
+
+        String operation
+                = 
Optional.ofNullable(jsonBody.get("operation")).map(JsonNode::asText).orElse(Ddb2Operations.PutItem.name());
+        if (exchange.hasProperties() && exchange.getProperty("operation", 
String.class) != null) {
+            operation = exchange.getProperty("operation", String.class);
+        }
 
-        JsonNode jsonBody = 
exchange.getMessage().getMandatoryBody(JsonNode.class);
+        if 
(exchange.getIn().getHeaders().containsKey(Ddb2Constants.OPERATION)) {
+            operation = exchange.getIn().getHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.class).name();
+        }
 
         JsonNode key = jsonBody.get("key");
         JsonNode item = jsonBody.get("item");
 
         Map<String, Object> keyProps;
         if (key != null) {
-            keyProps = mapper.convertValue(key, new TypeReference<Map<String, 
Object>>(){});
+            keyProps = dataFormat.getObjectMapper().convertValue(key, new 
TypeReference<Map<String, Object>>() {
+            });
         } else {
-            keyProps = mapper.convertValue(jsonBody, new 
TypeReference<Map<String, Object>>(){});
+            keyProps = dataFormat.getObjectMapper().convertValue(jsonBody, new 
TypeReference<Map<String, Object>>() {
+            });
         }
 
         Map<String, Object> itemProps;
         if (item != null) {
-            itemProps = mapper.convertValue(item, new 
TypeReference<Map<String, Object>>(){});
+            itemProps = dataFormat.getObjectMapper().convertValue(item, new 
TypeReference<Map<String, Object>>() {
+            });
         } else {
             itemProps = keyProps;
         }
@@ -115,8 +143,18 @@ public class JsonToDdbModelConverter {
             default:
                 throw new 
UnsupportedOperationException(String.format("Unsupported operation '%s'", 
operation));
         }
+    }
 
-        return "";
+    private JsonNode getBodyAsJsonNode(Exchange exchange) {
+        try {
+            if (exchange.getMessage().getBody() instanceof JsonNode) {
+                return exchange.getMessage().getMandatoryBody(JsonNode.class);
+            }
+
+            return (JsonNode) dataFormat.unmarshal(exchange, 
exchange.getMessage().getMandatoryBody(InputStream.class));
+        } catch (Exception e) {
+            throw new CamelExecutionException("Failed to get mandatory Json 
node from message body", exchange, e);
+        }
     }
 
     private void setHeaderIfNotPresent(String headerName, Object value, 
Exchange exchange) {
@@ -165,11 +203,12 @@ public class JsonToDdbModelConverter {
         }
 
         if (value instanceof int[]) {
-            return AttributeValue.builder().ns(Stream.of((int[]) 
value).map(Object::toString).collect(Collectors.toList())).build();
+            return AttributeValue.builder().ns(Stream.of((int[]) 
value).map(Object::toString).collect(Collectors.toList()))
+                    .build();
         }
 
         if (value instanceof List) {
-            List<?> values = ((List<?>) value);
+            List<?> values = (List<?>) value;
 
             if (values.isEmpty()) {
                 return AttributeValue.builder().ss().build();
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java
new file mode 100644
index 00000000..6065ebd1
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import software.amazon.awssdk.utils.IoUtils;
+
+/**
+ * Binary output type.
+ */
+@DataType(scheme = "aws2-s3", name = "binary")
+public class AWS2S3BinaryOutputType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        if (exchange.getMessage().getBody() instanceof byte[]) {
+            return;
+        }
+
+        try {
+            InputStream is = exchange.getMessage().getBody(InputStream.class);
+            if (is != null) {
+                exchange.getMessage().setBody(IoUtils.toByteArray(is));
+                return;
+            }
+
+            // Use default Camel converter utils to convert body to byte[]
+            
exchange.getMessage().setBody(exchange.getMessage().getMandatoryBody(byte[].class));
+        } catch (IOException | InvalidPayloadException e) {
+            throw new CamelExecutionException("Failed to convert AWS S3 body 
to byte[]", exchange, e);
+        }
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java
new file mode 100644
index 00000000..74736d67
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.utils.IoUtils;
+
+/**
+ * Json output data type represents file name as key and file content as Json 
structure.
+ * <p/>
+ * Example Json structure: { "key": "myFile.txt", "content": "Hello", }
+ */
+@DataType(scheme = "aws2-s3", name = "json")
+public class AWS2S3JsonOutputType implements DataTypeConverter {
+
+    private static final String TEMPLATE = "{" +
+            "\"key\": \"%s\", " +
+            "\"content\": \"%s\"" +
+            "}";
+
+    @Override
+    public void convert(Exchange exchange) {
+        String key = exchange.getMessage().getHeader(AWS2S3Constants.KEY, 
String.class);
+
+        ResponseInputStream<?> bodyInputStream = 
exchange.getMessage().getBody(ResponseInputStream.class);
+        if (bodyInputStream != null) {
+            try {
+                exchange.getMessage().setBody(String.format(TEMPLATE, key, 
IoUtils.toUtf8String(bodyInputStream)));
+                return;
+            } catch (IOException e) {
+                throw new CamelExecutionException("Failed to convert AWS S3 
body to Json", exchange, e);
+            }
+        }
+
+        byte[] bodyContent = exchange.getMessage().getBody(byte[].class);
+        if (bodyContent != null) {
+            exchange.getMessage().setBody(String.format(TEMPLATE, key, new 
String(bodyContent, StandardCharsets.UTF_8)));
+        }
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
new file mode 100644
index 00000000..047e6dd5
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.standard;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type converter able to unmarshal to given unmarshalType using jackson 
data format.
+ * <p/>
+ * Unmarshal type should be given as a fully qualified class name in the 
exchange properties.
+ */
+@DataType(name = "jsonObject")
+public class JsonModelDataType implements DataTypeConverter {
+
+    public static final String JSON_DATA_TYPE_KEY = "CamelJsonModelDataType";
+
+    @Override
+    public void convert(Exchange exchange) {
+        if (!exchange.hasProperties() || 
!exchange.getProperties().containsKey(JSON_DATA_TYPE_KEY)) {
+            return;
+        }
+
+        String type = exchange.getProperty(JSON_DATA_TYPE_KEY, String.class);
+        try (JacksonDataFormat dataFormat = new JacksonDataFormat(new 
ObjectMapper(), Class.forName(type))) {
+            Object unmarshalled = dataFormat.unmarshal(exchange, 
getBodyAsStream(exchange));
+            exchange.getMessage().setBody(unmarshalled);
+        } catch (Exception e) {
+            throw new CamelExecutionException(
+                    String.format("Failed to load Json unmarshalling type 
'%s'", type), exchange, e);
+        }
+    }
+
+    private InputStream getBodyAsStream(Exchange exchange) throws 
InvalidPayloadException {
+        InputStream bodyStream = 
exchange.getMessage().getBody(InputStream.class);
+
+        if (bodyStream == null) {
+            bodyStream = new 
ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+        }
+
+        return bodyStream;
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java
new file mode 100644
index 00000000..d39d30f8
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+@FunctionalInterface
+public interface DataTypeConverter {
+
+    void convert(Exchange exchange);
+
+    /**
+     * Gets the data type converter name. Automatically derives the name from 
given type annotation.
+     * @return
+     */
+    default String getName() {
+        if (this.getClass().isAnnotationPresent(DataType.class)) {
+            return this.getClass().getAnnotation(DataType.class).name();
+        }
+
+        throw new UnsupportedOperationException("Missing data type converter 
name");
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java
new file mode 100644
index 00000000..73f87c69
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.spi;
+
+/**
+ * A pluggable strategy to load data types into a {@link DataTypeRegistry}.
+ */
+public interface DataTypeLoader {
+
+    /**
+     * A pluggable strategy to load data types into a registry.
+     *
+     * @param  registry the registry to load the data types into
+     */
+    void load(DataTypeRegistry registry);
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java
new file mode 100644
index 00000000..cb2bedc9
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.spi;
+
+import java.util.Optional;
+
+/**
+ * Registry for data types. Data type loaders should be used to add types to 
the registry.
+ * <p/>
+ * The registry is able to perform a lookup of a specific data type.
+ */
+public interface DataTypeRegistry {
+
+    /**
+     * Registers a new default data type converter.
+     * @param scheme
+     * @param converter
+     */
+    void addDataTypeConverter(String scheme, DataTypeConverter converter);
+
+    /**
+     * Registers a new default data type converter.
+     * @param converter
+     */
+    default void addDataTypeConverter(DataTypeConverter converter) {
+        addDataTypeConverter("camel", converter);
+    }
+
+    /**
+     * Find data type for given component scheme and data type name.
+     * @param scheme
+     * @param name
+     * @return
+     */
+    Optional<DataTypeConverter> lookup(String scheme, String name);
+
+    /**
+     * Find data type for given data type name.
+     * @param name
+     * @return
+     */
+    default Optional<DataTypeConverter> lookup(String name) {
+        return lookup("camel", name);
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java
new file mode 100644
index 00000000..b1d4f5a9
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.spi.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Data type annotation defines a type with its component scheme, a name and 
input/output types.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target({ ElementType.TYPE })
+public @interface DataType {
+
+    /**
+     * Camel component scheme.
+     * @return
+     */
+    String scheme() default "camel";
+
+    /**
+     * Data type name.
+     * @return
+     */
+    String name();
+
+    /**
+     * The media type associated with this data type.
+     * @return
+     */
+    String mediaType() default "";
+}
diff --git 
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
new file mode 100644
index 00000000..b51d3404
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.kamelets.utils.format.converter.standard
+org.apache.camel.kamelets.utils.format.converter.aws2.ddb
+org.apache.camel.kamelets.utils.format.converter.aws2.s3
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
new file mode 100644
index 00000000..2ee4113e
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.util.Optional;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.impl.DefaultCamelContext;
+import 
org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class DefaultDataTypeRegistryTest {
+
+    private DefaultCamelContext camelContext;
+
+    private DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
+
+    @BeforeEach
+    void setup() {
+        this.camelContext = new DefaultCamelContext();
+        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+    }
+
+    @Test
+    public void shouldLookupDefaultDataTypeConverters() throws Exception {
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( 
"jsonObject");
+        Assertions.assertTrue(converter.isPresent());
+        Assertions.assertEquals(JsonModelDataType.class, 
converter.get().getClass());
+        converter = dataTypeRegistry.lookup( "string");
+        Assertions.assertTrue(converter.isPresent());
+        Assertions.assertEquals(DefaultDataTypeConverter.class, 
converter.get().getClass());
+        Assertions.assertEquals(String.class, ((DefaultDataTypeConverter) 
converter.get()).getType());
+        converter = dataTypeRegistry.lookup( "binary");
+        Assertions.assertTrue(converter.isPresent());
+        Assertions.assertEquals(DefaultDataTypeConverter.class, 
converter.get().getClass());
+        Assertions.assertEquals(byte[].class, ((DefaultDataTypeConverter) 
converter.get()).getType());
+    }
+
+}
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
similarity index 65%
rename from 
library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
rename to 
library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
index 33d27bfe..7f1f9e9f 100644
--- 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
@@ -14,16 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.kamelets.utils.transform.aws.ddb;
+
+package org.apache.camel.kamelets.utils.format.converter.aws2.ddb;
 
 import java.util.Map;
+import java.util.Optional;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
-import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.component.aws2.ddb.Ddb2Constants;
 import org.apache.camel.component.aws2.ddb.Ddb2Operations;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -33,25 +38,25 @@ import 
software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
 import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
 
-class JsonToDdbModelConverterTest {
+public class Ddb2JsonInputTypeTest {
 
     private DefaultCamelContext camelContext;
 
     private final ObjectMapper mapper = new ObjectMapper();
 
-    private final JsonToDdbModelConverter processor = new 
JsonToDdbModelConverter();
+    private final Ddb2JsonInputType inputType = new Ddb2JsonInputType();
 
     private final String keyJson = "{" +
-                "\"name\": \"Rajesh Koothrappali\"" +
+            "\"name\": \"Rajesh Koothrappali\"" +
             "}";
 
     private final String itemJson = "{" +
-                "\"name\": \"Rajesh Koothrappali\"," +
-                "\"age\": 29," +
-                "\"super-heroes\": [\"batman\", \"spiderman\", 
\"wonderwoman\"]," +
-                "\"issues\": [5, 3, 9, 1]," +
-                "\"girlfriend\": null," +
-                "\"doctorate\": true" +
+            "\"name\": \"Rajesh Koothrappali\"," +
+            "\"age\": 29," +
+            "\"super-heroes\": [\"batman\", \"spiderman\", \"wonderwoman\"]," +
+            "\"issues\": [5, 3, 9, 1]," +
+            "\"girlfriend\": null," +
+            "\"doctorate\": true" +
             "}";
 
     @BeforeEach
@@ -65,8 +70,8 @@ class JsonToDdbModelConverterTest {
         Exchange exchange = new DefaultExchange(camelContext);
 
         exchange.getMessage().setBody(mapper.readTree(itemJson));
-
-        processor.process(Ddb2Operations.PutItem.name(), exchange);
+        exchange.setProperty("operation", Ddb2Operations.PutItem.name());
+        inputType.convert(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
@@ -80,9 +85,10 @@ class JsonToDdbModelConverterTest {
     void shouldMapUpdateItemHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + 
", \"item\": " + itemJson + "}"));
+        exchange.getMessage().setBody(mapper.readTree("{\"operation\": \"" + 
Ddb2Operations.UpdateItem.name() + "\", \"key\": "
+                + keyJson + ", \"item\": " + itemJson + "}"));
 
-        processor.process(Ddb2Operations.UpdateItem.name(), exchange);
+        inputType.convert(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals(Ddb2Operations.UpdateItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
@@ -101,8 +107,9 @@ class JsonToDdbModelConverterTest {
         Exchange exchange = new DefaultExchange(camelContext);
 
         exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + 
"}"));
+        exchange.setProperty("operation", Ddb2Operations.DeleteItem.name());
 
-        processor.process(Ddb2Operations.DeleteItem.name(), exchange);
+        inputType.convert(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals(Ddb2Operations.DeleteItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
@@ -119,8 +126,8 @@ class JsonToDdbModelConverterTest {
         Exchange exchange = new DefaultExchange(camelContext);
 
         exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson 
+ "}"));
-
-        processor.process(Ddb2Operations.PutItem.name(), exchange);
+        exchange.setProperty("operation", Ddb2Operations.PutItem.name());
+        inputType.convert(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
@@ -130,11 +137,12 @@ class JsonToDdbModelConverterTest {
         Assertions.assertEquals(1L, attributeValueMap.size());
 
         
Assertions.assertEquals("AttributeValue(M={name=AttributeValue(S=Rajesh 
Koothrappali), " +
-                "age=AttributeValue(N=29), " +
-                "super-heroes=AttributeValue(SS=[batman, spiderman, 
wonderwoman]), " +
-                "issues=AttributeValue(NS=[5, 3, 9, 1]), " +
-                "girlfriend=AttributeValue(NUL=true), " +
-                "doctorate=AttributeValue(BOOL=true)})", 
attributeValueMap.get("user").toString());
+                        "age=AttributeValue(N=29), " +
+                        "super-heroes=AttributeValue(SS=[batman, spiderman, 
wonderwoman]), " +
+                        "issues=AttributeValue(NS=[5, 3, 9, 1]), " +
+                        "girlfriend=AttributeValue(NUL=true), " +
+                        "doctorate=AttributeValue(BOOL=true)})",
+                attributeValueMap.get("user").toString());
     }
 
     @Test
@@ -142,9 +150,10 @@ class JsonToDdbModelConverterTest {
     void shouldMapEmptyJson() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree("{}"));
+        exchange.getMessage().setBody("{}");
+        exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.PutItem.name());
 
-        processor.process(Ddb2Operations.PutItem.name(), exchange);
+        inputType.convert(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
@@ -154,20 +163,39 @@ class JsonToDdbModelConverterTest {
         Assertions.assertEquals(0L, attributeValueMap.size());
     }
 
-    @Test()
+    @Test
+    void shouldFailForWrongBodyType() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody("Hello");
+
+        Assertions.assertThrows(CamelExecutionException.class, () -> 
inputType.convert(exchange));
+    }
+
+    @Test
     void shouldFailForUnsupportedOperation() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
         exchange.getMessage().setBody(mapper.readTree("{}"));
+        exchange.setProperty("operation", Ddb2Operations.BatchGetItems.name());
 
-        Assertions.assertThrows(UnsupportedOperationException.class, () -> 
processor.process(Ddb2Operations.BatchGetItems.name(), exchange));
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> 
inputType.convert(exchange));
+    }
+
+    @Test
+    public void shouldLookupDataType() throws Exception {
+        DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
+        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+        Optional<DataTypeConverter> converter = 
dataTypeRegistry.lookup("aws2-ddb", "json");
+        Assertions.assertTrue(converter.isPresent());
     }
 
     private void assertAttributeValueMap(Map<String, AttributeValue> 
attributeValueMap) {
         Assertions.assertEquals(6L, attributeValueMap.size());
         Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
         Assertions.assertEquals(AttributeValue.builder().n("29").build(), 
attributeValueMap.get("age"));
-        Assertions.assertEquals(AttributeValue.builder().ss("batman", 
"spiderman", "wonderwoman").build(), attributeValueMap.get("super-heroes"));
+        Assertions.assertEquals(AttributeValue.builder().ss("batman", 
"spiderman", "wonderwoman").build(),
+                attributeValueMap.get("super-heroes"));
         Assertions.assertEquals(AttributeValue.builder().ns("5", "3", "9", 
"1").build(), attributeValueMap.get("issues"));
         Assertions.assertEquals(AttributeValue.builder().nul(true).build(), 
attributeValueMap.get("girlfriend"));
         Assertions.assertEquals(AttributeValue.builder().bool(true).build(), 
attributeValueMap.get("doctorate"));
@@ -175,11 +203,19 @@ class JsonToDdbModelConverterTest {
 
     private void assertAttributeValueUpdateMap(Map<String, 
AttributeValueUpdate> attributeValueMap) {
         Assertions.assertEquals(6L, attributeValueMap.size());
-        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh
 Koothrappali").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("name"));
-        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("age"));
-        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman",
 "spiderman", "wonderwoman").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("super-heroes"));
-        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5",
 "3", "9", "1").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("issues"));
-        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("girlfriend"));
-        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("doctorate"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh
 Koothrappali").build())
+                .action(AttributeAction.PUT).build(), 
attributeValueMap.get("name"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build())
+                .action(AttributeAction.PUT).build(), 
attributeValueMap.get("age"));
+        Assertions.assertEquals(
+                
AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman", 
"spiderman", "wonderwoman").build())
+                        .action(AttributeAction.PUT).build(),
+                attributeValueMap.get("super-heroes"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5",
 "3", "9", "1").build())
+                .action(AttributeAction.PUT).build(), 
attributeValueMap.get("issues"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build())
+                .action(AttributeAction.PUT).build(), 
attributeValueMap.get("girlfriend"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build())
+                .action(AttributeAction.PUT).build(), 
attributeValueMap.get("doctorate"));
     }
 }
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java
new file mode 100644
index 00000000..53357add
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.http.AbortableInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class AWS2S3JsonOutputTypeTest {
+
+    private final DefaultCamelContext camelContext = new DefaultCamelContext();
+
+    private final AWS2S3JsonOutputType outputType = new AWS2S3JsonOutputType();
+
+    @Test
+    void shouldMapFromStringToJsonModel() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test1.txt");
+        exchange.getMessage().setBody("Test1");
+        outputType.convert(exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        assertEquals("test1.txt", 
exchange.getMessage().getHeader(AWS2S3Constants.KEY));
+
+        assertJsonModelBody(exchange, "test1.txt", "Test1");
+    }
+
+    @Test
+    void shouldMapFromBytesToJsonModel() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test2.txt");
+        
exchange.getMessage().setBody("Test2".getBytes(StandardCharsets.UTF_8));
+        outputType.convert(exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        assertEquals("test2.txt", 
exchange.getMessage().getHeader(AWS2S3Constants.KEY));
+
+        assertJsonModelBody(exchange, "test2.txt", "Test2");
+    }
+
+    @Test
+    void shouldMapFromInputStreamToJsonModel() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test3.txt");
+        exchange.getMessage().setBody(new 
ResponseInputStream<>(GetObjectRequest.builder().bucket("myBucket").key("test3.txt").build(),
+                AbortableInputStream.create(new 
ByteArrayInputStream("Test3".getBytes(StandardCharsets.UTF_8)))));
+        outputType.convert(exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        assertEquals("test3.txt", 
exchange.getMessage().getHeader(AWS2S3Constants.KEY));
+
+        assertJsonModelBody(exchange, "test3.txt", "Test3");
+    }
+
+    @Test
+    public void shouldLookupDataType() throws Exception {
+        DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
+        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+        Optional<DataTypeConverter> converter = 
dataTypeRegistry.lookup("aws2-s3", "json");
+        Assertions.assertTrue(converter.isPresent());
+    }
+
+    private static void assertJsonModelBody(Exchange exchange, String key, 
String content) {
+        assertEquals(String.format("{\"key\": \"%s\", \"content\": \"%s\"}", 
key, content), exchange.getMessage().getBody());
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
new file mode 100644
index 00000000..c175cc6d
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.standard;
+
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JsonModelDataTypeTest {
+
+    private final DefaultCamelContext camelContext = new DefaultCamelContext();
+
+    private final JsonModelDataType dataType = new JsonModelDataType();
+
+    @Test
+    void shouldMapFromStringToJsonModel() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.setProperty(JsonModelDataType.JSON_DATA_TYPE_KEY, 
Person.class.getName());
+        exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
+        dataType.convert(exchange);
+
+        assertEquals(Person.class, exchange.getMessage().getBody().getClass());
+        assertEquals("Sheldon", 
exchange.getMessage().getBody(Person.class).getName());
+    }
+
+    @Test
+    public void shouldLookupDataType() throws Exception {
+        DefaultDataTypeRegistry dataTypeRegistry = new 
DefaultDataTypeRegistry();
+        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+        Optional<DataTypeConverter> converter = 
dataTypeRegistry.lookup("jsonObject");
+        Assertions.assertTrue(converter.isPresent());
+    }
+
+    public static class Person {
+        @JsonProperty
+        private String name;
+
+        @JsonProperty
+        private Long age;
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public Long getAge() {
+            return age;
+        }
+
+        public void setAge(Long age) {
+            this.age = age;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/resources/log4j2-test.xml 
b/library/camel-kamelets-utils/src/test/resources/log4j2-test.xml
new file mode 100644
index 00000000..1d6d8f38
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/resources/log4j2-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<Configuration status="INFO">
+  <Appenders>
+    <Console name="STDOUT" target="SYSTEM_OUT">
+      <PatternLayout pattern="%-5level| %msg%n"/>
+    </Console>
+    <Null name="NONE"/>
+  </Appenders>
+
+  <Loggers>
+    <Root level="INFO">
+      <AppenderRef ref="STDOUT"/>
+    </Root>
+  </Loggers>
+
+</Configuration>
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
index 5b603abf..ba200347 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
@@ -97,6 +97,12 @@ spec:
         x-descriptors:
           - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
+      inputFormat:
+        title: Input Type
+        description: Specify the input type for this Kamelet. The Kamelet will 
automatically apply conversion logic in order to transform message content to 
this data type.
+        type: string
+        default: json
+        example: json
   types:
     in:
       mediaType: application/json
@@ -107,17 +113,24 @@ spec:
   - "camel:aws2-ddb"
   - "camel:kamelet"
   template:
+    beans:
+    - name: dataTypeRegistry
+      type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
+    - name: inputTypeProcessor
+      type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
+      property:
+        - key: scheme
+          value: 'aws2-ddb'
+        - key: format
+          value: '{{inputFormat}}'
     from:
       uri: "kamelet:source"
       steps:
       - set-property:
-          name: operation
-          constant: "{{operation}}"
-      - unmarshal:
-          json:
-            library: Jackson
-            unmarshalType: com.fasterxml.jackson.databind.JsonNode
-      - bean: 
"org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
+        name: operation
+        constant: "{{operation}}"
+      - process:
+          ref: "{{inputTypeProcessor}}"
       - to:
           uri: "aws2-ddb:{{table}}"
           parameters:
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
index 6ab2bca4..e09cf4aa 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -107,6 +107,12 @@ spec:
         description: The number of milliseconds before the next poll of the 
selected bucket.
         type: integer
         default: 500
+      outputFormat:
+        title: Output Type
+        description: Choose the output type for this Kamelet. The Kamelet 
supports different output types and performs automatic message conversion 
according to this data type.
+        type: string
+        default: binary
+        example: binary
   dependencies:
     - "camel:core"
     - "camel:aws2-s3"
@@ -114,6 +120,15 @@ spec:
     - "camel:kamelet"
   template:
     beans:
+      - name: dataTypeRegistry
+        type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
+      - name: outputTypeProcessor
+        type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
+        property:
+          - key: scheme
+            value: 'aws2-s3'
+          - key: format
+            value: '{{outputFormat}}'
       - name: renameHeaders
         type: 
"#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
@@ -143,4 +158,6 @@ spec:
       steps:
       - process:
           ref: "{{renameHeaders}}"
+      - process:
+          ref: "{{outputTypeProcessor}}"
       - to: "kamelet:sink"
diff --git a/test/aws-s3/README.md b/test/aws-s3/README.md
new file mode 100644
index 00000000..6e7d7315
--- /dev/null
+++ b/test/aws-s3/README.md
@@ -0,0 +1,76 @@
+# AWS S3 Kamelet test
+
+This test verifies the AWS S3 Kamelet source defined in 
[aws-s3-source.kamelet.yaml](aws-s3-source.kamelet.yaml)
+
+## Objectives
+
+The test verifies the AWS S3 Kamelet source by creating a Camel K integration 
that uses the Kamelet and listens for messages on the
+AWS S3 bucket.
+
+The test uses a [LocalStack 
Testcontainers](https://www.testcontainers.org/modules/localstack/) instance to 
start a local AWS S3 service for mocking reasons.
+The Kamelet and the test interact with the local AWS S3 service for validation 
of functionality.
+
+### Test Kamelet source
+
+The test performs the following high level steps for configs - URI, secret and 
property based:
+
+*Preparation*
+- Start the AWS S3 service as LocalStack container
+- Overwrite the Kamelet with the latest source
+- Prepare the Camel AWS S3 client
+
+*Scenario* 
+- Create the Kamelet in the current namespace in the cluster
+- Create the Camel K integration that uses the Kamelet source to consume data 
from AWS S3 service
+- Wait for the Camel K integration to start and listen for AWS S3 messages
+- Create a new message in the AWS S3 bucket
+- Verify that the integration has received the message event
+
+*Cleanup*
+- Stop the LocalStack container
+- Delete the Camel K integration
+- Delete the secret from the current namespacce
+
+## Installation
+
+The test assumes that you have access to a Kubernetes cluster and that the 
Camel K operator as well as the YAKS operator is installed
+and running.
+
+You can review the installation steps for the operators in the documentation:
+
+- [Install Camel K 
operator](https://camel.apache.org/camel-k/latest/installation/installation.html)
+- [Install YAKS operator](https://github.com/citrusframework/yaks#installation)
+
+## Run the tests
+
+To run tests with URI based configuration: 
+
+```shell script
+$ yaks test aws-s3-source-uri-conf.feature
+```
+
+To run tests with secret based configuration:
+
+```shell script
+$ yaks test aws-s3-source-secret-conf.feature
+```
+
+To run tests with property based configuration:
+
+```shell script
+$ yaks test aws-s3-source-property-conf.feature
+```
+
+To run tests with URI binding:
+
+```shell script
+$ yaks test aws-s3-uri-binding.feature
+```
+
+To run tests with binding to Knative channel:
+
+```shell script
+$ yaks test aws-s3-inmem-binding.feature
+```
+
+You will be provided with the test log output and the test results.
diff --git a/test/aws-s3/amazonS3Client.groovy 
b/test/aws-s3/amazonS3Client.groovy
new file mode 100644
index 00000000..5c3ff8a0
--- /dev/null
+++ b/test/aws-s3/amazonS3Client.groovy
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.s3.S3Client
+
+S3Client s3 = S3Client
+        .builder()
+        
.endpointOverride(URI.create("${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}"))
+        .credentialsProvider(StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(
+                        "${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}",
+                        "${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}")
+        ))
+        .region(Region.of("${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}"))
+        .build()
+
+s3.createBucket(b -> b.bucket("${aws.s3.bucketNameOrArn}"))
+
+return s3
diff --git a/test/aws-s3/aws-s3-credentials.properties 
b/test/aws-s3/aws-s3-credentials.properties
new file mode 100644
index 00000000..f9dd1e10
--- /dev/null
+++ b/test/aws-s3/aws-s3-credentials.properties
@@ -0,0 +1,7 @@
+# Please add your AWS S3 account credentials
+camel.kamelet.aws-s3-source.aws-s3-credentials.bucketNameOrArn=${aws.s3.bucketNameOrArn}
+camel.kamelet.aws-s3-source.aws-s3-credentials.overrideEndpoint=true
+camel.kamelet.aws-s3-source.aws-s3-credentials.uriEndpointOverride=${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
+camel.kamelet.aws-s3-source.aws-s3-credentials.secretKey=${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+camel.kamelet.aws-s3-source.aws-s3-credentials.accessKey=${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+camel.kamelet.aws-s3-source.aws-s3-credentials.region=${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
diff --git a/test/aws-s3/aws-s3-inmem-binding.feature 
b/test/aws-s3/aws-s3-inmem-binding.feature
new file mode 100644
index 00000000..d67e7798
--- /dev/null
+++ b/test/aws-s3/aws-s3-inmem-binding.feature
@@ -0,0 +1,49 @@
+@knative
+Feature: AWS S3 Kamelet - binding to InMemoryChannel
+
+  Background:
+    Given Kamelet aws-s3-source is available
+    Given variables
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+
+  Scenario: Create Knative broker and channel
+    Given create Knative broker default
+    And Knative broker default is running
+    Given create Knative channel messages
+
+  Scenario: Create AWS-S3 Kamelet to InMemoryChannel binding
+    Given variable loginfo is "Installed features"
+    Given load KameletBinding aws-s3-to-inmem.yaml
+    Given load KameletBinding inmem-to-log.yaml
+    Then KameletBinding aws-s3-to-inmem should be available
+    And KameletBinding inmem-to-log should be available
+    And Camel K integration aws-s3-to-inmem is running
+    And Camel K integration inmem-to-log is running
+    And Camel K integration aws-s3-to-inmem should print ${loginfo}
+    And Camel K integration inmem-to-log should print ${loginfo}
+    Then sleep 10000 ms
+
+  Scenario: Verify Kamelet source
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange 
to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with 
body: ${aws.s3.message}
+    Then Camel K integration inmem-to-log should print ${aws.s3.message}
+
+  Scenario: Remove resources
+    Given delete KameletBinding aws-s3-to-inmem
+    Given delete KameletBinding inmem-to-log
+    Given delete Knative broker default
+    Given delete Knative channel messages
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-source-property-conf.feature 
b/test/aws-s3/aws-s3-source-property-conf.feature
new file mode 100644
index 00000000..93a2d353
--- /dev/null
+++ b/test/aws-s3/aws-s3-source-property-conf.feature
@@ -0,0 +1,37 @@
+Feature: AWS S3 Kamelet - property based config
+
+  Background:
+    Given Kamelet aws-s3-source is available
+    Given variables
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+
+  Scenario: Create AWS-S3 Kamelet to log binding
+    Given Camel K integration property file aws-s3-credentials.properties
+    Given create Camel K integration aws-s3-to-log-prop-based.groovy
+    """
+    from("kamelet:aws-s3-source/aws-s3-credentials")
+      .to("log:info")
+    """
+    Then Camel K integration aws-s3-to-log-prop-based should be running
+
+  Scenario: Verify Kamelet source
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange 
to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with 
body: ${aws.s3.message}
+    Then Camel K integration aws-s3-to-log-prop-based should print 
${aws.s3.message}
+
+  Scenario: Remove Camel K resources
+    Given delete Camel K integration aws-s3-to-log-prop-based
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-source-secret-conf.feature 
b/test/aws-s3/aws-s3-source-secret-conf.feature
new file mode 100644
index 00000000..78ee9be5
--- /dev/null
+++ b/test/aws-s3/aws-s3-source-secret-conf.feature
@@ -0,0 +1,39 @@
+@ignored
+Feature: AWS S3 Kamelet - secret based config
+
+  Background:
+    Given Kamelet aws-s3-source is available
+    Given variables
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+
+  Scenario: Create AWS-S3 Kamelet to log binding
+    Given create Kubernetes secret aws-s3-source-credentials
+      | aws-s3-credentials.properties | 
citrus:encodeBase64(citrus:readFile(aws-s3-credentials.properties)) |
+    Given create labels on Kubernetes secret aws-s3-source-credentials
+      | camel.apache.org/kamelet               | aws-s3-source |
+      | camel.apache.org/kamelet.configuration | aws-s3-credentials |
+    Given load Camel K integration aws-s3-to-log-secret-based.groovy
+    Then Camel K integration aws-s3-to-log-secret-based should be running
+
+  Scenario: Verify Kamelet source
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange 
to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with 
body: ${aws.s3.message}
+    Then Camel K integration aws-s3-to-log-secret-based should print 
${aws.s3.message}
+
+  Scenario: Remove resources
+    Given delete Camel K integration aws-s3-to-log-secret-based
+    Given delete Kubernetes secret aws-s3-source-credentials
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-source-uri-conf.feature 
b/test/aws-s3/aws-s3-source-uri-conf.feature
new file mode 100644
index 00000000..ca65ba7d
--- /dev/null
+++ b/test/aws-s3/aws-s3-source-uri-conf.feature
@@ -0,0 +1,32 @@
+Feature: AWS S3 Kamelet - URI based config
+
+  Background:
+    Given Kamelet aws-s3-source is available
+    Given variables
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+
+  Scenario: Create AWS-S3 Kamelet to log binding
+    Given load Camel K integration aws-s3-to-log-uri-based.groovy
+    Then Camel K integration aws-s3-to-log-uri-based should be running
+
+  Scenario: Verify Kamelet source
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange 
to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with 
body: ${aws.s3.message}
+    Then Camel K integration aws-s3-to-log-uri-based should print 
${aws.s3.message}
+
+  Scenario: Remove Camel K resources
+    Given delete Camel K integration aws-s3-to-log-uri-based
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-to-inmem.yaml b/test/aws-s3/aws-s3-to-inmem.yaml
new file mode 100644
index 00000000..ce880028
--- /dev/null
+++ b/test/aws-s3/aws-s3-to-inmem.yaml
@@ -0,0 +1,39 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: aws-s3-to-inmem
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: aws-s3-source
+    properties:
+      bucketNameOrArn: ${aws.s3.bucketNameOrArn}
+      overrideEndpoint: true
+      uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
+      accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+      secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+      region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
+  sink:
+    ref:
+      kind: InMemoryChannel
+      apiVersion: messaging.knative.dev/v1
+      name: messages
diff --git a/test/aws-s3/aws-s3-to-log-secret-based.groovy 
b/test/aws-s3/aws-s3-to-log-secret-based.groovy
new file mode 100644
index 00000000..02fb1c58
--- /dev/null
+++ b/test/aws-s3/aws-s3-to-log-secret-based.groovy
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// camel-k: language=groovy
+
+from("kamelet:aws-s3-source/aws-s3-credentials")
+  .to("log:info")
diff --git a/test/aws-s3/aws-s3-to-log-uri-based.groovy 
b/test/aws-s3/aws-s3-to-log-uri-based.groovy
new file mode 100644
index 00000000..145b5510
--- /dev/null
+++ b/test/aws-s3/aws-s3-to-log-uri-based.groovy
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// camel-k: language=groovy
+
+def parameters = 'bucketNameOrArn=${aws.s3.bucketNameOrArn}&'+
+                 'overrideEndpoint=true&' +
+                 
'uriEndpointOverride=${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}&' +
+                 'accessKey=${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}&' +
+                 'secretKey=${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}&'+
+                 'region=${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}&'+
+                 'deleteAfterRead=true'
+
+from("kamelet:aws-s3-source?$parameters")
+  .to("log:info")
diff --git a/test/aws-s3/aws-s3-uri-binding.feature 
b/test/aws-s3/aws-s3-uri-binding.feature
new file mode 100644
index 00000000..ace19177
--- /dev/null
+++ b/test/aws-s3/aws-s3-uri-binding.feature
@@ -0,0 +1,35 @@
+Feature: AWS S3 Kamelet - binding to URI
+
+  Background:
+    Given Kamelet aws-s3-source is available
+    Given variables
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+
+  Scenario: Create AWS-S3 Kamelet to log binding
+    Given variable loginfo is "Installed features"
+    When load KameletBinding aws-s3-uri-binding.yaml
+    And KameletBinding aws-s3-uri-binding is available
+    And Camel K integration aws-s3-uri-binding is running
+    Then Camel K integration aws-s3-uri-binding should print ${loginfo}
+
+  Scenario: Verify Kamelet source
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange 
to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with 
body: ${aws.s3.message}
+    Then Camel K integration aws-s3-uri-binding should print ${aws.s3.message}
+
+  Scenario: Remove Camel K resources
+    Given delete KameletBinding aws-s3-uri-binding
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-uri-binding.yaml 
b/test/aws-s3/aws-s3-uri-binding.yaml
new file mode 100644
index 00000000..50522818
--- /dev/null
+++ b/test/aws-s3/aws-s3-uri-binding.yaml
@@ -0,0 +1,37 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: aws-s3-uri-binding
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: aws-s3-source
+    properties:
+      bucketNameOrArn: ${aws.s3.bucketNameOrArn}
+      overrideEndpoint: true
+      outputFormat: json
+      uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
+      accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+      secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+      region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
+  sink:
+    uri: log:info
diff --git a/test/aws-s3/yaks-config.yaml b/test/aws-s3/yaks-config.yaml
new file mode 100644
index 00000000..f36d136c
--- /dev/null
+++ b/test/aws-s3/yaks-config.yaml
@@ -0,0 +1,65 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+config:
+  namespace:
+    temporary: false
+  runtime:
+    testcontainers:
+      enabled: true
+    env:
+      - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KNATIVE_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: CITRUS_TYPE_CONVERTER
+        value: camel
+    resources:
+      - amazonS3Client.groovy
+      - aws-s3-credentials.properties
+      - aws-s3-to-log-uri-based.groovy
+      - aws-s3-to-log-secret-based.groovy
+      - aws-s3-uri-binding.yaml
+      - aws-s3-to-inmem.yaml
+      - ../utils/inmem-to-log.yaml
+    cucumber:
+      tags:
+        - "not @ignored"
+    settings:
+      dependencies:
+        - groupId: com.amazonaws
+          artifactId: aws-java-sdk-kinesis
+          version: "@aws-java-sdk.version@"
+        - groupId: org.apache.camel
+          artifactId: camel-aws2-s3
+          version: "@camel.version@"
+        - groupId: org.apache.camel
+          artifactId: camel-jackson
+          version: "@camel.version@"
+  dump:
+    enabled: true
+    failedOnly: true
+    includes:
+      - app=camel-k
diff --git a/test/utils/inmem-to-log.yaml b/test/utils/inmem-to-log.yaml
new file mode 100644
index 00000000..8b5dc51e
--- /dev/null
+++ b/test/utils/inmem-to-log.yaml
@@ -0,0 +1,29 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: inmem-to-log
+spec:
+  source:
+    ref:
+      kind: InMemoryChannel
+      apiVersion: messaging.knative.dev/v1
+      name: messages
+  sink:
+    uri: log:info

Reply via email to