This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit a0406cc7a0587decd5eba2fe8f39a5ee5b0302e8
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Wed Nov 30 22:02:13 2022 +0100

    Revert existing Kamelets to not use data type converter
    
    - AWS S3 source Kamelet
    - AWS DDB sink Kamelet
    - JsonToDdbModelConverter utility and unit tests
---
 kamelets/aws-ddb-sink.kamelet.yaml                 |  25 +--
 kamelets/aws-s3-source.kamelet.yaml                |  19 --
 .../transform/aws/ddb/JsonToDdbModelConverter.java | 201 +++++++++++++++++++++
 .../aws/ddb/JsonToDdbModelConverterTest.java       | 184 +++++++++++++++++++
 .../resources/kamelets/aws-ddb-sink.kamelet.yaml   |  25 +--
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |  19 --
 6 files changed, 395 insertions(+), 78 deletions(-)

diff --git a/kamelets/aws-ddb-sink.kamelet.yaml 
b/kamelets/aws-ddb-sink.kamelet.yaml
index 87b338ee..5b603abf 100644
--- a/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/kamelets/aws-ddb-sink.kamelet.yaml
@@ -97,12 +97,6 @@ spec:
         x-descriptors:
           - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
-      inputFormat:
-        title: Input Type
-        description: Specify the input type for this Kamelet. The Kamelet will 
automatically apply conversion logic in order to transform message content to 
this data type.
-        type: string
-        default: json
-        example: json
   types:
     in:
       mediaType: application/json
@@ -113,26 +107,17 @@ spec:
   - "camel:aws2-ddb"
   - "camel:kamelet"
   template:
-    beans:
-    - name: dataTypeRegistry
-      type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
-    - name: inputTypeProcessor
-      type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
-      property:
-        - key: scheme
-          value: 'aws2-ddb'
-        - key: format
-          value: '{{inputFormat}}'
-        - key: registry
-          value: '#bean:{{dataTypeRegistry}}'
     from:
       uri: "kamelet:source"
       steps:
       - set-property:
           name: operation
           constant: "{{operation}}"
-      - process:
-          ref: "{{inputTypeProcessor}}"
+      - unmarshal:
+          json:
+            library: Jackson
+            unmarshalType: com.fasterxml.jackson.databind.JsonNode
+      - bean: 
"org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
       - to:
           uri: "aws2-ddb:{{table}}"
           parameters:
diff --git a/kamelets/aws-s3-source.kamelet.yaml 
b/kamelets/aws-s3-source.kamelet.yaml
index d937f6e5..6ab2bca4 100644
--- a/kamelets/aws-s3-source.kamelet.yaml
+++ b/kamelets/aws-s3-source.kamelet.yaml
@@ -107,12 +107,6 @@ spec:
         description: The number of milliseconds before the next poll of the 
selected bucket.
         type: integer
         default: 500
-      outputFormat:
-        title: Output Type
-        description: Choose the output type for this Kamelet. The Kamelet 
supports different output types and performs automatic message conversion 
according to this data type.
-        type: string
-        default: binary
-        example: binary
   dependencies:
     - "camel:core"
     - "camel:aws2-s3"
@@ -120,17 +114,6 @@ spec:
     - "camel:kamelet"
   template:
     beans:
-      - name: dataTypeRegistry
-        type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
-      - name: outputTypeProcessor
-        type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
-        property:
-          - key: scheme
-            value: 'aws2-s3'
-          - key: format
-            value: '{{outputFormat}}'
-          - key: registry
-            value: '#bean:{{dataTypeRegistry}}'
       - name: renameHeaders
         type: 
"#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
@@ -160,6 +143,4 @@ spec:
       steps:
       - process:
           ref: "{{renameHeaders}}"
-      - process:
-          ref: "{{outputTypeProcessor}}"
       - to: "kamelet:sink"
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
new file mode 100644
index 00000000..2a203ed0
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kamelets.utils.transform.aws.ddb;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+
+/**
+ * Maps Json body to DynamoDB attribute value map and sets the attribute map 
as Camel DynamoDB header entries.
+ *
+ * Json property names map to attribute keys and Json property values map to 
attribute values.
+ *
+ * During mapping the Json property types resolve to the respective attribute 
types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}).
+ * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code 
NumberSet} attribute values.
+ *
+ * For PutItem operation the Json body defines all item attributes.
+ *
+ * For DeleteItem operation the Json body defines only the primary key 
attributes that identify the item to delete.
+ *
+ * For UpdateItem operation the Json body defines both key attributes to 
identify the item to be updated and all item attributes tht get updated on the 
item.
+ *
+ * The given Json body can use "key" and "item" as top level properties.
+ * Both define a Json object that will be mapped to respective attribute value 
maps:
+ * <pre>{@code
+ * {
+ *   "key": {},
+ *   "item": {}
+ * }
+ * }
+ * </pre>
+ * The converter will extract the objects and set respective attribute value 
maps as header entries.
+ * This is a comfortable way to define different key and item attribute value 
maps e.g. on UpdateItem operation.
+ *
+ * In case key and item attribute value maps are identical you can omit the 
special top level properties completely.
+ * The converter will map the whole Json body as is then and use it as source 
for the attribute value map.
+ */
+public class JsonToDdbModelConverter {
+
+    public String process(@ExchangeProperty("operation") String operation, 
Exchange exchange) throws InvalidPayloadException {
+        if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) 
||
+                
exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) {
+            return "";
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        JsonNode jsonBody = 
exchange.getMessage().getMandatoryBody(JsonNode.class);
+
+        JsonNode key = jsonBody.get("key");
+        JsonNode item = jsonBody.get("item");
+
+        Map<String, Object> keyProps;
+        if (key != null) {
+            keyProps = mapper.convertValue(key, new TypeReference<Map<String, 
Object>>(){});
+        } else {
+            keyProps = mapper.convertValue(jsonBody, new 
TypeReference<Map<String, Object>>(){});
+        }
+
+        Map<String, Object> itemProps;
+        if (item != null) {
+            itemProps = mapper.convertValue(item, new 
TypeReference<Map<String, Object>>(){});
+        } else {
+            itemProps = keyProps;
+        }
+
+        final Map<String, AttributeValue> keyMap = 
getAttributeValueMap(keyProps);
+
+        switch (Ddb2Operations.valueOf(operation)) {
+            case PutItem:
+                exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.PutItem);
+                exchange.getMessage().setHeader(Ddb2Constants.ITEM, 
getAttributeValueMap(itemProps));
+                setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, 
ReturnValue.ALL_OLD.toString(), exchange);
+                break;
+            case UpdateItem:
+                exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.UpdateItem);
+                exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap);
+                exchange.getMessage().setHeader(Ddb2Constants.UPDATE_VALUES, 
getAttributeValueUpdateMap(itemProps));
+                setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, 
ReturnValue.ALL_NEW.toString(), exchange);
+                break;
+            case DeleteItem:
+                exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.DeleteItem);
+                exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap);
+                setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, 
ReturnValue.ALL_OLD.toString(), exchange);
+                break;
+            default:
+                throw new 
UnsupportedOperationException(String.format("Unsupported operation '%s'", 
operation));
+        }
+
+        return "";
+    }
+
+    private void setHeaderIfNotPresent(String headerName, Object value, 
Exchange exchange) {
+        exchange.getMessage().setHeader(headerName, value);
+    }
+
+    private Map<String, AttributeValue> getAttributeValueMap(Map<String, 
Object> body) {
+        final Map<String, AttributeValue> attributeValueMap = new 
LinkedHashMap<>();
+
+        for (Map.Entry<String, Object> attribute : body.entrySet()) {
+            attributeValueMap.put(attribute.getKey(), 
getAttributeValue(attribute.getValue()));
+        }
+
+        return attributeValueMap;
+    }
+
+    private Map<String, AttributeValueUpdate> 
getAttributeValueUpdateMap(Map<String, Object> body) {
+        final Map<String, AttributeValueUpdate> attributeValueMap = new 
LinkedHashMap<>();
+
+        for (Map.Entry<String, Object> attribute : body.entrySet()) {
+            attributeValueMap.put(attribute.getKey(), 
getAttributeValueUpdate(attribute.getValue()));
+        }
+
+        return attributeValueMap;
+    }
+
+    private static AttributeValue getAttributeValue(Object value) {
+        if (value == null) {
+            return AttributeValue.builder().nul(true).build();
+        }
+
+        if (value instanceof String) {
+            return AttributeValue.builder().s(value.toString()).build();
+        }
+
+        if (value instanceof Integer) {
+            return AttributeValue.builder().n(value.toString()).build();
+        }
+
+        if (value instanceof Boolean) {
+            return AttributeValue.builder().bool((Boolean) value).build();
+        }
+
+        if (value instanceof String[]) {
+            return AttributeValue.builder().ss((String[]) value).build();
+        }
+
+        if (value instanceof int[]) {
+            return AttributeValue.builder().ns(Stream.of((int[]) 
value).map(Object::toString).collect(Collectors.toList())).build();
+        }
+
+        if (value instanceof List) {
+            List<?> values = ((List<?>) value);
+
+            if (values.isEmpty()) {
+                return AttributeValue.builder().ss().build();
+            } else if (values.get(0) instanceof Integer) {
+                return 
AttributeValue.builder().ns(values.stream().map(Object::toString).collect(Collectors.toList())).build();
+            } else {
+                return 
AttributeValue.builder().ss(values.stream().map(Object::toString).collect(Collectors.toList())).build();
+            }
+        }
+
+        if (value instanceof Map) {
+            Map<String, AttributeValue> nestedAttributes = new 
LinkedHashMap<>();
+
+            for (Map.Entry<?, ?> nested : ((Map<?, ?>) value).entrySet()) {
+                nestedAttributes.put(nested.getKey().toString(), 
getAttributeValue(nested.getValue()));
+            }
+
+            return AttributeValue.builder().m(nestedAttributes).build();
+        }
+
+        return AttributeValue.builder().s(value.toString()).build();
+    }
+
+    private static AttributeValueUpdate getAttributeValueUpdate(Object value) {
+        return AttributeValueUpdate.builder()
+                .action(AttributeAction.PUT)
+                .value(getAttributeValue(value)).build();
+    }
+}
\ No newline at end of file
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
new file mode 100644
index 00000000..e88dce4e
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kamelets.utils.transform.aws.ddb;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+
+class JsonToDdbModelConverterTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private final JsonToDdbModelConverter processor = new 
JsonToDdbModelConverter();
+
+    private final String keyJson = "{" +
+            "\"name\": \"Rajesh Koothrappali\"" +
+            "}";
+
+    private final String itemJson = "{" +
+            "\"name\": \"Rajesh Koothrappali\"," +
+            "\"age\": 29," +
+            "\"super-heroes\": [\"batman\", \"spiderman\", \"wonderwoman\"]," +
+            "\"issues\": [5, 3, 9, 1]," +
+            "\"girlfriend\": null," +
+            "\"doctorate\": true" +
+            "}";
+
+    @BeforeEach
+    void setup() {
+        this.camelContext = new DefaultCamelContext();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapPutItemHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(itemJson));
+
+        processor.process(Ddb2Operations.PutItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        
assertAttributeValueMap(exchange.getMessage().getHeader(Ddb2Constants.ITEM, 
Map.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapUpdateItemHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + 
", \"item\": " + itemJson + "}"));
+
+        processor.process(Ddb2Operations.UpdateItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.UpdateItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_NEW.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class);
+        Assertions.assertEquals(1L, attributeValueMap.size());
+        Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
+
+        
assertAttributeValueUpdateMap(exchange.getMessage().getHeader(Ddb2Constants.UPDATE_VALUES,
 Map.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapDeleteItemHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + 
"}"));
+
+        processor.process(Ddb2Operations.DeleteItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.DeleteItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class);
+        Assertions.assertEquals(1L, attributeValueMap.size());
+        Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapNestedObjects() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson 
+ "}"));
+
+        processor.process(Ddb2Operations.PutItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class);
+        Assertions.assertEquals(1L, attributeValueMap.size());
+
+        
Assertions.assertEquals("AttributeValue(M={name=AttributeValue(S=Rajesh 
Koothrappali), " +
+                "age=AttributeValue(N=29), " +
+                "super-heroes=AttributeValue(SS=[batman, spiderman, 
wonderwoman]), " +
+                "issues=AttributeValue(NS=[5, 3, 9, 1]), " +
+                "girlfriend=AttributeValue(NUL=true), " +
+                "doctorate=AttributeValue(BOOL=true)})", 
attributeValueMap.get("user").toString());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapEmptyJson() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{}"));
+
+        processor.process(Ddb2Operations.PutItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class);
+        Assertions.assertEquals(0L, attributeValueMap.size());
+    }
+
+    @Test()
+    void shouldFailForUnsupportedOperation() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{}"));
+
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> 
processor.process(Ddb2Operations.BatchGetItems.name(), exchange));
+    }
+
+    private void assertAttributeValueMap(Map<String, AttributeValue> 
attributeValueMap) {
+        Assertions.assertEquals(6L, attributeValueMap.size());
+        Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
+        Assertions.assertEquals(AttributeValue.builder().n("29").build(), 
attributeValueMap.get("age"));
+        Assertions.assertEquals(AttributeValue.builder().ss("batman", 
"spiderman", "wonderwoman").build(), attributeValueMap.get("super-heroes"));
+        Assertions.assertEquals(AttributeValue.builder().ns("5", "3", "9", 
"1").build(), attributeValueMap.get("issues"));
+        Assertions.assertEquals(AttributeValue.builder().nul(true).build(), 
attributeValueMap.get("girlfriend"));
+        Assertions.assertEquals(AttributeValue.builder().bool(true).build(), 
attributeValueMap.get("doctorate"));
+    }
+
+    private void assertAttributeValueUpdateMap(Map<String, 
AttributeValueUpdate> attributeValueMap) {
+        Assertions.assertEquals(6L, attributeValueMap.size());
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh
 Koothrappali").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("name"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("age"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman",
 "spiderman", "wonderwoman").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("super-heroes"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5",
 "3", "9", "1").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("issues"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("girlfriend"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("doctorate"));
+    }
+}
\ No newline at end of file
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
index 87b338ee..5b603abf 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
@@ -97,12 +97,6 @@ spec:
         x-descriptors:
           - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
-      inputFormat:
-        title: Input Type
-        description: Specify the input type for this Kamelet. The Kamelet will 
automatically apply conversion logic in order to transform message content to 
this data type.
-        type: string
-        default: json
-        example: json
   types:
     in:
       mediaType: application/json
@@ -113,26 +107,17 @@ spec:
   - "camel:aws2-ddb"
   - "camel:kamelet"
   template:
-    beans:
-    - name: dataTypeRegistry
-      type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
-    - name: inputTypeProcessor
-      type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
-      property:
-        - key: scheme
-          value: 'aws2-ddb'
-        - key: format
-          value: '{{inputFormat}}'
-        - key: registry
-          value: '#bean:{{dataTypeRegistry}}'
     from:
       uri: "kamelet:source"
       steps:
       - set-property:
           name: operation
           constant: "{{operation}}"
-      - process:
-          ref: "{{inputTypeProcessor}}"
+      - unmarshal:
+          json:
+            library: Jackson
+            unmarshalType: com.fasterxml.jackson.databind.JsonNode
+      - bean: 
"org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
       - to:
           uri: "aws2-ddb:{{table}}"
           parameters:
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
index d937f6e5..6ab2bca4 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -107,12 +107,6 @@ spec:
         description: The number of milliseconds before the next poll of the 
selected bucket.
         type: integer
         default: 500
-      outputFormat:
-        title: Output Type
-        description: Choose the output type for this Kamelet. The Kamelet 
supports different output types and performs automatic message conversion 
according to this data type.
-        type: string
-        default: binary
-        example: binary
   dependencies:
     - "camel:core"
     - "camel:aws2-s3"
@@ -120,17 +114,6 @@ spec:
     - "camel:kamelet"
   template:
     beans:
-      - name: dataTypeRegistry
-        type: 
"#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
-      - name: outputTypeProcessor
-        type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
-        property:
-          - key: scheme
-            value: 'aws2-s3'
-          - key: format
-            value: '{{outputFormat}}'
-          - key: registry
-            value: '#bean:{{dataTypeRegistry}}'
       - name: renameHeaders
         type: 
"#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
@@ -160,6 +143,4 @@ spec:
       steps:
       - process:
           ref: "{{renameHeaders}}"
-      - process:
-          ref: "{{outputTypeProcessor}}"
       - to: "kamelet:sink"

Reply via email to