This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit 47814237ff1a52d3aa11a59a092c54189d1ea866
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Wed Mar 9 16:13:39 2022 +0100

    chore: Add Kamelet aws-ddb-sink
    
    - Add sink Kamelet for AWS DynamoDB
    - Use YAKS version 0.9.0-202203140033
    - Add integration test for aws-ddb-sink Kamelet
    - Introduce unit tests on camel-kamelets-utils module
---
 .github/workflows/yaks-tests.yaml                  |   6 +-
 .../aws-ddb-sink.kamelet.yaml                      |  91 ++++++----
 kamelets/aws-ddb-streams-source.kamelet.yaml       |   4 +-
 kamelets/aws-s3-source.kamelet.yaml                |   6 +-
 library/camel-kamelets-utils/pom.xml               |  42 +++++
 .../transform/aws/ddb/JsonToDdbModelConverter.java | 201 +++++++++++++++++++++
 .../aws/ddb/JsonToDdbModelConverterTest.java       | 194 ++++++++++++++++++++
 ...urce.kamelet.yaml => aws-ddb-sink.kamelet.yaml} |  91 ++++++----
 .../kamelets/aws-ddb-streams-source.kamelet.yaml   |   4 +-
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |   6 +-
 test/aws-ddb-sink/amazonDDBClient.groovy           |  53 ++++++
 .../aws-ddb-sink-binding.yaml}                     |  65 +++----
 test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature  |  48 +++++
 test/aws-ddb-sink/aws-ddb-sink-putItem.feature     |  41 +++++
 test/aws-ddb-sink/aws-ddb-sink-updateItem.feature  |  51 ++++++
 test/aws-ddb-sink/putItem.groovy                   |  30 +++
 test/aws-ddb-sink/verifyItems.groovy               |  18 ++
 test/aws-ddb-sink/yaks-config.yaml                 |  71 ++++++++
 test/earthquake-source/earthquake-source.feature   |   2 +-
 .../insert-field-action.feature                    |   4 +-
 test/mail-sink/mail-sink.feature                   |   8 +-
 test/mail-sink/yaks-config.yaml                    |   2 +
 test/timer-source/timer-source.feature             |   2 +-
 23 files changed, 923 insertions(+), 117 deletions(-)

diff --git a/.github/workflows/yaks-tests.yaml 
b/.github/workflows/yaks-tests.yaml
index fcede74..11cf6ab 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -42,8 +42,8 @@ concurrency:
 
 env:
   CAMEL_K_VERSION: 1.7.0
-  YAKS_VERSION: 0.8.0
-  YAKS_IMAGE_NAME: "docker.io/citrusframework/yaks"
+  YAKS_VERSION: 0.9.0-202203140033
+  YAKS_IMAGE_NAME: "docker.io/yaks/yaks"
 
 jobs:
   test:
@@ -63,7 +63,7 @@ jobs:
         rm -r _kamel
     - name: Get YAKS CLI
       run: |
-        curl --fail -L --silent 
https://github.com/citrusframework/yaks/releases/download/v${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz
 -o yaks.tar.gz
+        curl --fail -L --silent 
https://github.com/citrusframework/yaks/releases/download/${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz
 -o yaks.tar.gz
         mkdir -p _yaks
         tar -zxf yaks.tar.gz --directory ./_yaks
         sudo mv ./_yaks/yaks /usr/local/bin/
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
 b/kamelets/aws-ddb-sink.kamelet.yaml
similarity index 60%
copy from 
library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
copy to kamelets/aws-ddb-sink.kamelet.yaml
index 512e139..2080bd2 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
+++ b/kamelets/aws-ddb-sink.kamelet.yaml
@@ -18,7 +18,7 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: Kamelet
 metadata:
-  name: aws-ddb-streams-source
+  name: aws-ddb-sink
   annotations:
     camel.apache.org/kamelet.support.level: "Preview"
     camel.apache.org/catalog.version: "main-SNAPSHOT"
@@ -26,16 +26,22 @@ metadata:
     camel.apache.org/provider: "Apache Software Foundation"
     camel.apache.org/kamelet.group: "AWS DynamoDB Streams"
   labels:
-    camel.apache.org/kamelet.type: "source"
+    camel.apache.org/kamelet.type: "sink"
 spec:
   definition:
-    title: "AWS DynamoDB Streams Source"
+    title: "AWS DynamoDB Sink"
     description: |-
-      Receive events from AWS DynamoDB Streams.
+      Send data to AWS DynamoDB service. The sent data will 
insert/update/delete an item on the given AWS DynamoDB table.
 
-      Access Key/Secret Key are the basic method for authenticating to the AWS 
DynamoDB Streams Service. These parameters are optional, because the Kamelet 
provide also the 'useDefaultCredentialsProvider'.
-      
-      When using a default Credentials Provider the AWS DynamoDB Streams 
client will load the credentials through this provider and won't use the static 
credential. This is reason for not having the access key and secret key as 
mandatory parameter for this Kamelet.
+      Access Key/Secret Key are the basic method for authenticating to the AWS 
DynamoDB service. These parameters are optional, because the Kamelet provide 
also the 'useDefaultCredentialsProvider'.
+
+      When using a default Credentials Provider the AWS DynamoDB client will 
load the credentials through this provider and won't use the static credential. 
This is reason for not having the access key and secret key as mandatory 
parameter for this Kamelet.
+
+      This Kamelet expects a JSON as body. The mapping between the JSON fields 
and table attribute values is done by key, so if you have the input:
+
+      '{"username":"oscerd", "city":"Rome"}'
+
+      The Kamelet will insert/update an item in the given AWS DynamoDB table 
and set the attributes 'username' and 'city' respectively. Please note that the 
JSON object must include the primary key values that define the item.
     required:
       - table
       - region
@@ -66,43 +72,64 @@ spec:
         description: The AWS region to connect to
         type: string
         example: eu-west-1
-      iteratorType:
-        title: Iterator Type
-        description: Defines where in the DynaboDB stream to start getting 
records. Note that using TRIM_HORIZON can cause a significant delay before the 
stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are used, then 
a sequenceNumberProvider MUST be supplied. There are 4 enums and the value can 
be one of TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
-        type: string
-        default: LATEST
-      sequenceNumberProvider:
-        title: Sequence Number Provider
-        description: Provider for the sequence number when using one of the 
two ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER iterator 
types. Can be a registry reference or a literal sequence number.
+      operation:
+        title: Operation
+        description: The operation to perform (one of PutItem, UpdateItem, 
DeleteItem)
         type: string
-        example: "900000000005745712447"
-        default: "000000000000000000000"
+        default: PutItem
+        example: PutItem
+      writeCapacity:
+        title: Write Capacity
+        description: The provisioned throughput to reserved for writing 
resources to your table
+        type: integer
+        default: 1
       useDefaultCredentialsProvider:
         title: Default Credentials Provider
         description: Set whether the S3 client should expect to load 
credentials through a default credentials provider or to expect static 
credentials to be passed in.
         type: boolean
         x-descriptors:
-        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
+      uriEndpointOverride:
+        title: Overwritte Endpoint URI
+        description: Set the overriding endpoint URI. This option needs to be 
used in combination with overrideEndpoint option.
+        type: string
+      overrideEndpoint:
+        title: Endpoint Overwrite
+        description: Set the need for overiding the endpoint URI. This option 
needs to be used in combination with uriEndpointOverride setting.
+        type: boolean
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
   types:
-    out:
+    in:
       mediaType: application/json
   dependencies:
-  - "camel:gson"
+  - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
+  - "camel:core"
+  - "camel:jackson"
   - "camel:aws2-ddb"
   - "camel:kamelet"
   template:
     from:
-      uri: "aws2-ddbstream:{{table}}"
-      parameters:
-        secretKey: "{{?secretKey}}"
-        accessKey: "{{?accessKey}}"
-        region: "{{region}}"
-        iteratorType: "{{iteratorType}}"
-        sequenceNumberProvider: "{{sequenceNumberProvider}}"
-        useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
+      uri: "kamelet:source"
       steps:
-      - marshal:
-          json: 
-            library: Gson
-      - to: "kamelet:sink"
+      - set-property:
+          name: operation
+          constant: "{{operation}}"
+      - unmarshal:
+          json:
+            library: Jackson
+            unmarshalType: com.fasterxml.jackson.databind.JsonNode
+      - bean: 
"org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
+      - to:
+          uri: "aws2-ddb:{{table}}"
+          parameters:
+            secretKey: "{{?secretKey}}"
+            accessKey: "{{?accessKey}}"
+            region: "{{region}}"
+            operation: "{{operation}}"
+            writeCapacity: "{{?writeCapacity}}"
+            useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
+            uriEndpointOverride: "{{?uriEndpointOverride}}"
+            overrideEndpoint: "{{overrideEndpoint}}"
diff --git a/kamelets/aws-ddb-streams-source.kamelet.yaml 
b/kamelets/aws-ddb-streams-source.kamelet.yaml
index 512e139..31e62ef 100644
--- a/kamelets/aws-ddb-streams-source.kamelet.yaml
+++ b/kamelets/aws-ddb-streams-source.kamelet.yaml
@@ -34,7 +34,7 @@ spec:
       Receive events from AWS DynamoDB Streams.
 
       Access Key/Secret Key are the basic method for authenticating to the AWS 
DynamoDB Streams Service. These parameters are optional, because the Kamelet 
provide also the 'useDefaultCredentialsProvider'.
-      
+
       When using a default Credentials Provider the AWS DynamoDB Streams 
client will load the credentials through this provider and won't use the static 
credential. This is reason for not having the access key and secret key as 
mandatory parameter for this Kamelet.
     required:
       - table
@@ -103,6 +103,6 @@ spec:
         useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
       steps:
       - marshal:
-          json: 
+          json:
             library: Gson
       - to: "kamelet:sink"
diff --git a/kamelets/aws-s3-source.kamelet.yaml 
b/kamelets/aws-s3-source.kamelet.yaml
index 439777d..ce9439a 100644
--- a/kamelets/aws-s3-source.kamelet.yaml
+++ b/kamelets/aws-s3-source.kamelet.yaml
@@ -17,7 +17,7 @@ spec:
       Receive data from AWS S3 Bucket.
 
       Access Key/Secret Key are the basic method for authenticating to the AWS 
S3 Service. These parameters are optional, because the Kamelet provide also the 
'useDefaultCredentialsProvider'.
-      
+
       When using a default Credentials Provider the S3 client will load the 
credentials through this provider and won't use the static credential. This is 
reason for not having the access key and secret key as mandatory parameter for 
this Kamelet.
     required:
       - bucketNameOrArn
@@ -58,14 +58,14 @@ spec:
         example: eu-west-1
       autoCreateBucket:
         title: Autocreate Bucket
-        description: Setting the autocreation of the S3 bucket bucketName. 
+        description: Setting the autocreation of the S3 bucket bucketName.
         type: boolean
         x-descriptors:
         - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
       includeBody:
         title: Include Body
-        description: If it is true, the exchange will be consumed and put into 
the body and closed. If false the S3Object stream will be put raw into the body 
and the headers will be set with the S3 object metadata. 
+        description: If it is true, the exchange will be consumed and put into 
the body and closed. If false the S3Object stream will be put raw into the body 
and the headers will be set with the S3 object metadata.
         type: boolean
         x-descriptors:
         - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
diff --git a/library/camel-kamelets-utils/pom.xml 
b/library/camel-kamelets-utils/pom.xml
index cb71d07..4f848d3 100644
--- a/library/camel-kamelets-utils/pom.xml
+++ b/library/camel-kamelets-utils/pom.xml
@@ -71,6 +71,48 @@
             <artifactId>camel-kafka</artifactId>
         </dependency>
 
+        <!-- AWS Dynamo DB camel component -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-ddb</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Test scoped dependencies -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>${junit.jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
new file mode 100644
index 0000000..c5098c1
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kamelets.utils.transform.aws.ddb;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+
+/**
+ * Maps Json body to DynamoDB attribute value map and sets the attribute map 
as Camel DynamoDB header entries.
+ *
+ * Json property names map to attribute keys and Json property values map to 
attribute values.
+ *
+ * During mapping the Json property types resolve to the respective attribute 
types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}).
+ * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code 
NumberSet} attribute values.
+ *
+ * For PutItem operation the Json body defines all item attributes.
+ *
+ * For DeleteItem operation the Json body defines only the primary key 
attributes that identify the item to delete.
+ *
+ * For UpdateItem operation the Json body defines both key attributes to 
identify the item to be updated and all item attributes tht get updated on the 
item.
+ *
+ * The given Json body can use "key" and "item" as top level properties.
+ * Both define a Json object that will be mapped to respective attribute value 
maps:
+ * <pre>{@code
+ * {
+ *   "key": {},
+ *   "item": {}
+ * }
+ * }
+ * </pre>
+ * The converter will extract the objects and set respective attribute value 
maps as header entries.
+ * This is a comfortable way to define different key and item attribute value 
maps e.g. on UpdateItem operation.
+ *
+ * In case key and item attribute value maps are identical you can omit the 
special top level properties completely.
+ * The converter will map the whole Json body as is then and use it as source 
for the attribute value map.
+ */
+public class JsonToDdbModelConverter {
+
+    public String process(@ExchangeProperty("operation") String operation, 
Exchange exchange) throws InvalidPayloadException {
+        if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) 
||
+                
exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) {
+            return "";
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        JsonNode jsonBody = 
exchange.getMessage().getMandatoryBody(JsonNode.class);
+
+        JsonNode key = jsonBody.get("key");
+        JsonNode item = jsonBody.get("item");
+
+        Map<String, Object> keyProps;
+        if (key != null) {
+            keyProps = mapper.convertValue(key, new TypeReference<Map<String, 
Object>>(){});
+        } else {
+            keyProps = mapper.convertValue(jsonBody, new 
TypeReference<Map<String, Object>>(){});
+        }
+
+        Map<String, Object> itemProps;
+        if (item != null) {
+            itemProps = mapper.convertValue(item, new 
TypeReference<Map<String, Object>>(){});
+        } else {
+            itemProps = keyProps;
+        }
+
+        final Map<String, AttributeValue> keyMap = 
getAttributeValueMap(keyProps);
+
+        switch (Ddb2Operations.valueOf(operation)) {
+            case PutItem:
+                exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.PutItem);
+                exchange.getMessage().setHeader(Ddb2Constants.ITEM, 
getAttributeValueMap(itemProps));
+                setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, 
ReturnValue.ALL_OLD.toString(), exchange);
+                break;
+            case UpdateItem:
+                exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.UpdateItem);
+                exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap);
+                exchange.getMessage().setHeader(Ddb2Constants.UPDATE_VALUES, 
getAttributeValueUpdateMap(itemProps));
+                setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, 
ReturnValue.ALL_NEW.toString(), exchange);
+                break;
+            case DeleteItem:
+                exchange.getMessage().setHeader(Ddb2Constants.OPERATION, 
Ddb2Operations.DeleteItem);
+                exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap);
+                setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, 
ReturnValue.ALL_OLD.toString(), exchange);
+                break;
+            default:
+                throw new 
UnsupportedOperationException(String.format("Unsupported operation '%s'", 
operation));
+        }
+
+        return "";
+    }
+
+    private void setHeaderIfNotPresent(String headerName, Object value, 
Exchange exchange) {
+        exchange.getMessage().setHeader(headerName, value);
+    }
+
+    private Map<String, AttributeValue> getAttributeValueMap(Map<String, 
Object> body) {
+        final Map<String, AttributeValue> attributeValueMap = new 
LinkedHashMap<>();
+
+        for (Map.Entry<String, Object> attribute : body.entrySet()) {
+            attributeValueMap.put(attribute.getKey(), 
getAttributeValue(attribute.getValue()));
+        }
+
+        return attributeValueMap;
+    }
+
+    private Map<String, AttributeValueUpdate> 
getAttributeValueUpdateMap(Map<String, Object> body) {
+        final Map<String, AttributeValueUpdate> attributeValueMap = new 
LinkedHashMap<>();
+
+        for (Map.Entry<String, Object> attribute : body.entrySet()) {
+            attributeValueMap.put(attribute.getKey(), 
getAttributeValueUpdate(attribute.getValue()));
+        }
+
+        return attributeValueMap;
+    }
+
+    private static AttributeValue getAttributeValue(Object value) {
+        if (value == null) {
+            return AttributeValue.builder().nul(true).build();
+        }
+
+        if (value instanceof String) {
+            return AttributeValue.builder().s(value.toString()).build();
+        }
+
+        if (value instanceof Integer) {
+            return AttributeValue.builder().n(value.toString()).build();
+        }
+
+        if (value instanceof Boolean) {
+            return AttributeValue.builder().bool((Boolean) value).build();
+        }
+
+        if (value instanceof String[]) {
+            return AttributeValue.builder().ss((String[]) value).build();
+        }
+
+        if (value instanceof int[]) {
+            return AttributeValue.builder().ns(Stream.of((int[]) 
value).map(Object::toString).collect(Collectors.toList())).build();
+        }
+
+        if (value instanceof List) {
+            List<?> values = ((List<?>) value);
+
+            if (values.isEmpty()) {
+                return AttributeValue.builder().ss().build();
+            } else if (values.get(0) instanceof Integer) {
+                return 
AttributeValue.builder().ns(values.stream().map(Object::toString).collect(Collectors.toList())).build();
+            } else {
+                return 
AttributeValue.builder().ss(values.stream().map(Object::toString).collect(Collectors.toList())).build();
+            }
+        }
+
+        if (value instanceof Map) {
+            Map<String, AttributeValue> nestedAttributes = new 
LinkedHashMap<>();
+
+            for (Map.Entry<?, ?> nested : ((Map<?, ?>) value).entrySet()) {
+                nestedAttributes.put(nested.getKey().toString(), 
getAttributeValue(nested.getValue()));
+            }
+
+            return AttributeValue.builder().m(nestedAttributes).build();
+        }
+
+        return AttributeValue.builder().s(value.toString()).build();
+    }
+
+    private static AttributeValueUpdate getAttributeValueUpdate(Object value) {
+        return AttributeValueUpdate.builder()
+                .action(AttributeAction.PUT)
+                .value(getAttributeValue(value)).build();
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
new file mode 100644
index 0000000..9f3053a
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kamelets.utils.transform.aws.ddb;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.aws2.ddb.Ddb2Constants;
+import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+
+class JsonToDdbModelConverterTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private final JsonToDdbModelConverter processor = new 
JsonToDdbModelConverter();
+
+    private final String keyJson = "{" +
+                "\"name\": \"Rajesh Koothrappali\"" +
+            "}";
+
+    private final String itemJson = "{" +
+                "\"name\": \"Rajesh Koothrappali\"," +
+                "\"age\": 29," +
+                "\"super-heroes\": [\"batman\", \"spiderman\", 
\"wonderwoman\"]," +
+                "\"issues\": [5, 3, 9, 1]," +
+                "\"girlfriend\": null," +
+                "\"doctorate\": true" +
+            "}";
+
+    @BeforeEach
+    void setup() {
+        this.camelContext = new DefaultCamelContext();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapPutItemHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree(itemJson));
+
+        processor.process(Ddb2Operations.PutItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        
assertAttributeValueMap(exchange.getMessage().getHeader(Ddb2Constants.ITEM, 
Map.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapUpdateItemHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + 
", \"item\": " + itemJson + "}"));
+
+        processor.process(Ddb2Operations.UpdateItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.UpdateItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_NEW.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class);
+        Assertions.assertEquals(1L, attributeValueMap.size());
+        Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
+
+        
assertAttributeValueUpdateMap(exchange.getMessage().getHeader(Ddb2Constants.UPDATE_VALUES,
 Map.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapDeleteItemHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + 
"}"));
+
+        processor.process(Ddb2Operations.DeleteItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.DeleteItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class);
+        Assertions.assertEquals(1L, attributeValueMap.size());
+        Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapNestedObjects() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson 
+ "}"));
+
+        processor.process(Ddb2Operations.PutItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class);
+        Assertions.assertEquals(1L, attributeValueMap.size());
+
+        
Assertions.assertEquals("AttributeValue(M={name=AttributeValue(S=Rajesh 
Koothrappali), " +
+                "age=AttributeValue(N=29), " +
+                "super-heroes=AttributeValue(SS=[batman, spiderman, 
wonderwoman]), " +
+                "issues=AttributeValue(NS=[5, 3, 9, 1]), " +
+                "girlfriend=AttributeValue(NUL=true), " +
+                "doctorate=AttributeValue(BOOL=true)})", 
attributeValueMap.get("user").toString());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void shouldMapEmptyJson() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{}"));
+
+        processor.process(Ddb2Operations.PutItem.name(), exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals(Ddb2Operations.PutItem, 
exchange.getMessage().getHeader(Ddb2Constants.OPERATION));
+        Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), 
exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES));
+
+        Map<String, AttributeValue> attributeValueMap = 
exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class);
+        Assertions.assertEquals(0L, attributeValueMap.size());
+    }
+
+    @Test
+    void shouldFailForWrongBodyType() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody("{}");
+
+        Assertions.assertThrows(InvalidPayloadException.class, () -> 
processor.process(Ddb2Operations.PutItem.name(), exchange));
+    }
+
+    @Test()
+    void shouldFailForUnsupportedOperation() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(mapper.readTree("{}"));
+
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> 
processor.process(Ddb2Operations.BatchGetItems.name(), exchange));
+    }
+
+    private void assertAttributeValueMap(Map<String, AttributeValue> 
attributeValueMap) {
+        Assertions.assertEquals(6L, attributeValueMap.size());
+        Assertions.assertEquals(AttributeValue.builder().s("Rajesh 
Koothrappali").build(), attributeValueMap.get("name"));
+        Assertions.assertEquals(AttributeValue.builder().n("29").build(), 
attributeValueMap.get("age"));
+        Assertions.assertEquals(AttributeValue.builder().ss("batman", 
"spiderman", "wonderwoman").build(), attributeValueMap.get("super-heroes"));
+        Assertions.assertEquals(AttributeValue.builder().ns("5", "3", "9", 
"1").build(), attributeValueMap.get("issues"));
+        Assertions.assertEquals(AttributeValue.builder().nul(true).build(), 
attributeValueMap.get("girlfriend"));
+        Assertions.assertEquals(AttributeValue.builder().bool(true).build(), 
attributeValueMap.get("doctorate"));
+    }
+
+    private void assertAttributeValueUpdateMap(Map<String, 
AttributeValueUpdate> attributeValueMap) {
+        Assertions.assertEquals(6L, attributeValueMap.size());
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh
 Koothrappali").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("name"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("age"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman",
 "spiderman", "wonderwoman").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("super-heroes"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5",
 "3", "9", "1").build()).action(AttributeAction.PUT).build(), 
attributeValueMap.get("issues"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("girlfriend"));
+        
Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()).action(AttributeAction.PUT).build(),
 attributeValueMap.get("doctorate"));
+    }
+}
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
 b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
similarity index 60%
copy from 
library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
copy to 
library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
index 512e139..2080bd2 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
@@ -18,7 +18,7 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: Kamelet
 metadata:
-  name: aws-ddb-streams-source
+  name: aws-ddb-sink
   annotations:
     camel.apache.org/kamelet.support.level: "Preview"
     camel.apache.org/catalog.version: "main-SNAPSHOT"
@@ -26,16 +26,22 @@ metadata:
     camel.apache.org/provider: "Apache Software Foundation"
     camel.apache.org/kamelet.group: "AWS DynamoDB Streams"
   labels:
-    camel.apache.org/kamelet.type: "source"
+    camel.apache.org/kamelet.type: "sink"
 spec:
   definition:
-    title: "AWS DynamoDB Streams Source"
+    title: "AWS DynamoDB Sink"
     description: |-
-      Receive events from AWS DynamoDB Streams.
+      Send data to AWS DynamoDB service. The sent data will 
insert/update/delete an item on the given AWS DynamoDB table.
 
-      Access Key/Secret Key are the basic method for authenticating to the AWS 
DynamoDB Streams Service. These parameters are optional, because the Kamelet 
provide also the 'useDefaultCredentialsProvider'.
-      
-      When using a default Credentials Provider the AWS DynamoDB Streams 
client will load the credentials through this provider and won't use the static 
credential. This is reason for not having the access key and secret key as 
mandatory parameter for this Kamelet.
+      Access Key/Secret Key are the basic method for authenticating to the AWS 
DynamoDB service. These parameters are optional, because the Kamelet provide 
also the 'useDefaultCredentialsProvider'.
+
+      When using a default Credentials Provider the AWS DynamoDB client will 
load the credentials through this provider and won't use the static credential. 
This is reason for not having the access key and secret key as mandatory 
parameter for this Kamelet.
+
+      This Kamelet expects a JSON as body. The mapping between the JSON fields 
and table attribute values is done by key, so if you have the input:
+
+      '{"username":"oscerd", "city":"Rome"}'
+
+      The Kamelet will insert/update an item in the given AWS DynamoDB table 
and set the attributes 'username' and 'city' respectively. Please note that the 
JSON object must include the primary key values that define the item.
     required:
       - table
       - region
@@ -66,43 +72,64 @@ spec:
         description: The AWS region to connect to
         type: string
         example: eu-west-1
-      iteratorType:
-        title: Iterator Type
-        description: Defines where in the DynaboDB stream to start getting 
records. Note that using TRIM_HORIZON can cause a significant delay before the 
stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are used, then 
a sequenceNumberProvider MUST be supplied. There are 4 enums and the value can 
be one of TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
-        type: string
-        default: LATEST
-      sequenceNumberProvider:
-        title: Sequence Number Provider
-        description: Provider for the sequence number when using one of the 
two ShardIteratorType AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER iterator 
types. Can be a registry reference or a literal sequence number.
+      operation:
+        title: Operation
+        description: The operation to perform (one of PutItem, UpdateItem, 
DeleteItem)
         type: string
-        example: "900000000005745712447"
-        default: "000000000000000000000"
+        default: PutItem
+        example: PutItem
+      writeCapacity:
+        title: Write Capacity
+        description: The provisioned throughput to reserved for writing 
resources to your table
+        type: integer
+        default: 1
       useDefaultCredentialsProvider:
         title: Default Credentials Provider
         description: Set whether the S3 client should expect to load 
credentials through a default credentials provider or to expect static 
credentials to be passed in.
         type: boolean
         x-descriptors:
-        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
+      uriEndpointOverride:
+        title: Overwritte Endpoint URI
+        description: Set the overriding endpoint URI. This option needs to be 
used in combination with overrideEndpoint option.
+        type: string
+      overrideEndpoint:
+        title: Endpoint Overwrite
+        description: Set the need for overiding the endpoint URI. This option 
needs to be used in combination with uriEndpointOverride setting.
+        type: boolean
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
   types:
-    out:
+    in:
       mediaType: application/json
   dependencies:
-  - "camel:gson"
+  - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
+  - "camel:core"
+  - "camel:jackson"
   - "camel:aws2-ddb"
   - "camel:kamelet"
   template:
     from:
-      uri: "aws2-ddbstream:{{table}}"
-      parameters:
-        secretKey: "{{?secretKey}}"
-        accessKey: "{{?accessKey}}"
-        region: "{{region}}"
-        iteratorType: "{{iteratorType}}"
-        sequenceNumberProvider: "{{sequenceNumberProvider}}"
-        useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
+      uri: "kamelet:source"
       steps:
-      - marshal:
-          json: 
-            library: Gson
-      - to: "kamelet:sink"
+      - set-property:
+          name: operation
+          constant: "{{operation}}"
+      - unmarshal:
+          json:
+            library: Jackson
+            unmarshalType: com.fasterxml.jackson.databind.JsonNode
+      - bean: 
"org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
+      - to:
+          uri: "aws2-ddb:{{table}}"
+          parameters:
+            secretKey: "{{?secretKey}}"
+            accessKey: "{{?accessKey}}"
+            region: "{{region}}"
+            operation: "{{operation}}"
+            writeCapacity: "{{?writeCapacity}}"
+            useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
+            uriEndpointOverride: "{{?uriEndpointOverride}}"
+            overrideEndpoint: "{{overrideEndpoint}}"
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
index 512e139..31e62ef 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
@@ -34,7 +34,7 @@ spec:
       Receive events from AWS DynamoDB Streams.
 
       Access Key/Secret Key are the basic method for authenticating to the AWS 
DynamoDB Streams Service. These parameters are optional, because the Kamelet 
provide also the 'useDefaultCredentialsProvider'.
-      
+
       When using a default Credentials Provider the AWS DynamoDB Streams 
client will load the credentials through this provider and won't use the static 
credential. This is reason for not having the access key and secret key as 
mandatory parameter for this Kamelet.
     required:
       - table
@@ -103,6 +103,6 @@ spec:
         useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}"
       steps:
       - marshal:
-          json: 
+          json:
             library: Gson
       - to: "kamelet:sink"
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
index 439777d..ce9439a 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -17,7 +17,7 @@ spec:
       Receive data from AWS S3 Bucket.
 
       Access Key/Secret Key are the basic method for authenticating to the AWS 
S3 Service. These parameters are optional, because the Kamelet provide also the 
'useDefaultCredentialsProvider'.
-      
+
       When using a default Credentials Provider the S3 client will load the 
credentials through this provider and won't use the static credential. This is 
reason for not having the access key and secret key as mandatory parameter for 
this Kamelet.
     required:
       - bucketNameOrArn
@@ -58,14 +58,14 @@ spec:
         example: eu-west-1
       autoCreateBucket:
         title: Autocreate Bucket
-        description: Setting the autocreation of the S3 bucket bucketName. 
+        description: Setting the autocreation of the S3 bucket bucketName.
         type: boolean
         x-descriptors:
         - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
         default: false
       includeBody:
         title: Include Body
-        description: If it is true, the exchange will be consumed and put into 
the body and closed. If false the S3Object stream will be put raw into the body 
and the headers will be set with the S3 object metadata. 
+        description: If it is true, the exchange will be consumed and put into 
the body and closed. If false the S3Object stream will be put raw into the body 
and the headers will be set with the S3 object metadata.
         type: boolean
         x-descriptors:
         - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
diff --git a/test/aws-ddb-sink/amazonDDBClient.groovy 
b/test/aws-ddb-sink/amazonDDBClient.groovy
new file mode 100644
index 0000000..dc0b2a8
--- /dev/null
+++ b/test/aws-ddb-sink/amazonDDBClient.groovy
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient
+import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
+import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement
+import software.amazon.awssdk.services.dynamodb.model.KeyType
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType
+
+DynamoDbClient amazonDDBClient = DynamoDbClient
+        .builder()
+        
.endpointOverride(URI.create("${YAKS_TESTCONTAINERS_LOCALSTACK_DYNAMODB_URL}"))
+        .credentialsProvider(StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(
+                        "${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}",
+                        "${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}")
+        ))
+        .region(Region.of("${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}"))
+        .build()
+
+amazonDDBClient.createTable(b -> {
+        b.tableName("${aws.ddb.tableName}")
+        b.keySchema(
+                
KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build(),
+        )
+        b.attributeDefinitions(
+                
AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.N).build(),
+        )
+        b.provisionedThroughput(
+                ProvisionedThroughput.builder()
+                        .readCapacityUnits(1L)
+                        .writeCapacityUnits(1L).build())
+})
+
+return amazonDDBClient
diff --git a/test/mail-sink/yaks-config.yaml 
b/test/aws-ddb-sink/aws-ddb-sink-binding.yaml
similarity index 50%
copy from test/mail-sink/yaks-config.yaml
copy to test/aws-ddb-sink/aws-ddb-sink-binding.yaml
index c2e8d95..dd04856 100644
--- a/test/mail-sink/yaks-config.yaml
+++ b/test/aws-ddb-sink/aws-ddb-sink-binding.yaml
@@ -15,35 +15,36 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  runtime:
-    env:
-      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
-        value: false
-      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
-        value: false
-    settings:
-      dependencies:
-        - groupId: com.consol.citrus
-          artifactId: citrus-mail
-          version: "@citrus.version@"
-    resources:
-      - mail-server.groovy
-      - timer-to-mail.yaml
-pre:
-- name: installation
-  if: env:CI=true
-  run: |
-    # Install required Kamelets (these steps may be done globally in future 
versions)
-
-    kamel install -n $YAKS_NAMESPACE -w
-    kubectl delete kamelet -n $YAKS_NAMESPACE --all
-    kubectl apply -f ../../kamelets/mail-sink.kamelet.yaml -n $YAKS_NAMESPACE
-    kubectl apply -f ../../kamelets/timer-source.kamelet.yaml -n 
$YAKS_NAMESPACE
-post:
-- name: dump
-  if: env:CI=true
-  run: |
-    kamel dump -n $YAKS_NAMESPACE $(basename `pwd`)-dump.log
-    mkdir -p /tmp/dumps
-    cp *-dump.log /tmp/dumps
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: aws-ddb-sink-binding
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: timer-source
+    properties:
+      period: ${timer.source.period}
+      message: '${aws.ddb.json.data}'
+  steps:
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: log-sink
+        propeties:
+          showHeaders: true
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: aws-ddb-sink
+    properties:
+      table: ${aws.ddb.tableName}
+      operation: ${aws.ddb.operation}
+      overrideEndpoint: true
+      uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_DYNAMODB_URL}
+      accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+      secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+      region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
diff --git a/test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature 
b/test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature
new file mode 100644
index 0000000..48f0a81
--- /dev/null
+++ b/test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature
@@ -0,0 +1,48 @@
+Feature: AWS DDB Sink - DeleteItem
+
+  Background:
+    Given Kamelet aws-ddb-sink is available
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    Given variables
+      | timer.source.period  | 10000 |
+      | aws.ddb.operation    | DeleteItem |
+      | aws.ddb.tableName    | movies |
+      | aws.ddb.item.id      | 1 |
+      | aws.ddb.item.year    | 1985 |
+      | aws.ddb.item.title   | Back to the future |
+      | aws.ddb.json.data    | {"id": ${aws.ddb.item.id}} |
+
+  Scenario: Start LocalStack container
+    Given Enable service DYNAMODB
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-DDB client
+    Given New global Camel context
+    Given load to Camel registry amazonDDBClient.groovy
+
+  Scenario: Create item on AWS-DDB
+    Given run script putItem.groovy
+    Given variables
+      | aws.ddb.items     | [{year=AttributeValue(N=${aws.ddb.item.year}), 
id=AttributeValue(N=${aws.ddb.item.id}), 
title=AttributeValue(S=${aws.ddb.item.title})}] |
+    Then run script verifyItems.groovy
+
+  Scenario: Create AWS-DDB Kamelet sink binding
+    When load KameletBinding aws-ddb-sink-binding.yaml
+    And KameletBinding aws-ddb-sink-binding is available
+    And Camel K integration aws-ddb-sink-binding is running
+    And Camel K integration aws-ddb-sink-binding should print Routes startup
+    Then sleep 10sec
+
+  Scenario: Verify Kamelet sink
+    Given variables
+      | aws.ddb.items     | [] |
+    Then run script verifyItems.groovy
+
+  Scenario: Remove Camel K resources
+    Given delete KameletBinding aws-ddb-sink-binding
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-ddb-sink/aws-ddb-sink-putItem.feature 
b/test/aws-ddb-sink/aws-ddb-sink-putItem.feature
new file mode 100644
index 0000000..d01d5ce
--- /dev/null
+++ b/test/aws-ddb-sink/aws-ddb-sink-putItem.feature
@@ -0,0 +1,41 @@
+Feature: AWS DDB Sink - PutItem
+
+  Background:
+    Given Kamelet aws-ddb-sink is available
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    Given variables
+      | timer.source.period  | 10000 |
+      | aws.ddb.operation    | PutItem |
+      | aws.ddb.tableName    | movies |
+      | aws.ddb.item.id      | 1 |
+      | aws.ddb.item.year    | 1977 |
+      | aws.ddb.item.title   | Star Wars IV |
+      | aws.ddb.json.data    | { "id":${aws.ddb.item.id}, 
"year":${aws.ddb.item.year}, "title":"${aws.ddb.item.title}" } |
+      | aws.ddb.items        | [{year=AttributeValue(N=${aws.ddb.item.year}), 
id=AttributeValue(N=${aws.ddb.item.id}), 
title=AttributeValue(S=${aws.ddb.item.title})}] |
+
+  Scenario: Start LocalStack container
+    Given Enable service DYNAMODB
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-DDB client
+    Given New global Camel context
+    Given load to Camel registry amazonDDBClient.groovy
+
+  Scenario: Create AWS-DDB Kamelet sink binding
+    When load KameletBinding aws-ddb-sink-binding.yaml
+    And KameletBinding aws-ddb-sink-binding is available
+    And Camel K integration aws-ddb-sink-binding is running
+    And Camel K integration aws-ddb-sink-binding should print Routes startup
+    Then sleep 10sec
+
+  Scenario: Verify Kamelet sink
+    Then run script verifyItems.groovy
+
+  Scenario: Remove Camel K resources
+    Given delete KameletBinding aws-ddb-sink-binding
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-ddb-sink/aws-ddb-sink-updateItem.feature 
b/test/aws-ddb-sink/aws-ddb-sink-updateItem.feature
new file mode 100644
index 0000000..87fe078
--- /dev/null
+++ b/test/aws-ddb-sink/aws-ddb-sink-updateItem.feature
@@ -0,0 +1,51 @@
+Feature: AWS DDB Sink - UpdateItem
+
+  Background:
+    Given Kamelet aws-ddb-sink is available
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    Given variables
+      | timer.source.period    | 10000 |
+      | aws.ddb.operation      | UpdateItem |
+      | aws.ddb.tableName      | movies |
+      | aws.ddb.item.id        | 1 |
+      | aws.ddb.item.year      | 1933 |
+      | aws.ddb.item.title     | King Kong |
+      | aws.ddb.item.title.new | King Kong - Historical |
+      | aws.ddb.item.directors | ["Merian C. Cooper", "Ernest B. Schoedsack"] |
+      | aws.ddb.json.data      | { "key": {"id": ${aws.ddb.item.id}}, "item": 
{"title": "${aws.ddb.item.title.new}", "year": ${aws.ddb.item.year}, 
"directors": ${aws.ddb.item.directors}} } |
+
+  Scenario: Start LocalStack container
+    Given Enable service DYNAMODB
+    Given start LocalStack container
+    And log 'Started LocalStack container: 
${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}'
+
+  Scenario: Create AWS-DDB client
+    Given New global Camel context
+    Given load to Camel registry amazonDDBClient.groovy
+
+  Scenario: Create item on AWS-DDB
+    Given run script putItem.groovy
+    Given variables
+      | aws.ddb.items | [{year=AttributeValue(N=${aws.ddb.item.year}), 
id=AttributeValue(N=${aws.ddb.item.id}), 
title=AttributeValue(S=${aws.ddb.item.title})}] |
+    Then run script verifyItems.groovy
+
+  Scenario: Create AWS-DDB Kamelet sink binding
+    When load KameletBinding aws-ddb-sink-binding.yaml
+    And KameletBinding aws-ddb-sink-binding is available
+    And Camel K integration aws-ddb-sink-binding is running
+    And Camel K integration aws-ddb-sink-binding should print Routes startup
+    Then sleep 10sec
+
+  Scenario: Verify Kamelet sink
+    Given variables
+      | aws.ddb.item.directors | [Ernest B. Schoedsack, Merian C. Cooper] |
+      | aws.ddb.items | [{year=AttributeValue(N=${aws.ddb.item.year}), 
directors=AttributeValue(SS=${aws.ddb.item.directors}), 
id=AttributeValue(N=${aws.ddb.item.id}), 
title=AttributeValue(S=${aws.ddb.item.title.new})}] |
+    Then run script verifyItems.groovy
+
+  Scenario: Remove Camel K resources
+    Given delete KameletBinding aws-ddb-sink-binding
+
+  Scenario: Stop container
+    Given stop LocalStack container
diff --git a/test/aws-ddb-sink/putItem.groovy b/test/aws-ddb-sink/putItem.groovy
new file mode 100644
index 0000000..fd482f9
--- /dev/null
+++ b/test/aws-ddb-sink/putItem.groovy
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue
+
+Map<String, AttributeValue> item = new HashMap<>()
+item.put("id", AttributeValue.builder().n("${aws.ddb.item.id}").build())
+item.put("year", AttributeValue.builder().n("${aws.ddb.item.year}").build())
+item.put("title", AttributeValue.builder().s("${aws.ddb.item.title}").build())
+
+amazonDDBClient.putItem(b -> {
+    b.tableName("${aws.ddb.tableName}")
+    b.item(item)
+    b.returnValues(ReturnValue.ALL_OLD)
+})
diff --git a/test/aws-ddb-sink/verifyItems.groovy 
b/test/aws-ddb-sink/verifyItems.groovy
new file mode 100644
index 0000000..b6e9d27
--- /dev/null
+++ b/test/aws-ddb-sink/verifyItems.groovy
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+assert "${aws.ddb.items}".equals(amazonDDBClient.scan(b -> 
b.tableName("${aws.ddb.tableName}"))?.items()?.toString())
diff --git a/test/aws-ddb-sink/yaks-config.yaml 
b/test/aws-ddb-sink/yaks-config.yaml
new file mode 100644
index 0000000..6118b7b
--- /dev/null
+++ b/test/aws-ddb-sink/yaks-config.yaml
@@ -0,0 +1,71 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+config:
+  namespace:
+    temporary: false
+  runtime:
+    testcontainers:
+      enabled: true
+    env:
+      - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: CITRUS_TYPE_CONVERTER
+        value: camel
+    resources:
+      - putItem.groovy
+      - verifyItems.groovy
+      - amazonDDBClient.groovy
+      - aws-ddb-sink-binding.yaml
+    cucumber:
+      tags:
+        - "not @ignored"
+    settings:
+      dependencies:
+        - groupId: com.amazonaws
+          artifactId: aws-java-sdk-dynamodb
+          version: "@aws-java-sdk.version@"
+        - groupId: org.apache.camel
+          artifactId: camel-aws2-ddb
+          version: "@camel.version@"
+        - groupId: org.apache.camel
+          artifactId: camel-jackson
+          version: "@camel.version@"
+pre:
+  - name: installation
+    if: env:CI=true
+    run: |
+      # Install required Kamelets (these steps may be done globally in future 
versions)
+
+      kamel install -n $YAKS_NAMESPACE -w
+      kubectl delete kamelet -n $YAKS_NAMESPACE --all
+      kubectl apply -f ../../kamelets/timer-source.kamelet.yaml -n 
$YAKS_NAMESPACE
+      kubectl apply -f ../../kamelets/log-sink.kamelet.yaml -n $YAKS_NAMESPACE
+      kubectl apply -f ../../kamelets/aws-ddb-sink.kamelet.yaml -n 
$YAKS_NAMESPACE
+post:
+  - name: dump
+    if: env:CI=true
+    run: |
+      kamel dump -n $YAKS_NAMESPACE $(basename `pwd`)-dump.log
+      mkdir -p /tmp/dumps
+      cp *-dump.log /tmp/dumps
diff --git a/test/earthquake-source/earthquake-source.feature 
b/test/earthquake-source/earthquake-source.feature
index e18a2cf..3a7c180 100644
--- a/test/earthquake-source/earthquake-source.feature
+++ b/test/earthquake-source/earthquake-source.feature
@@ -3,7 +3,7 @@ Feature: Kamelet earthquake-source works
   Background:
     Given Disable auto removal of Kamelet resources
     Given Disable auto removal of Kubernetes resources
-    Given Camel-K resource polling configuration
+    Given Camel K resource polling configuration
       | maxAttempts          | 60   |
       | delayBetweenAttempts | 3000 |
 
diff --git a/test/insert-field-action/insert-field-action.feature 
b/test/insert-field-action/insert-field-action.feature
index b0f0ca5..850903e 100644
--- a/test/insert-field-action/insert-field-action.feature
+++ b/test/insert-field-action/insert-field-action.feature
@@ -3,13 +3,13 @@ Feature: Timer Source Kamelet
   Background:
     Given Disable auto removal of Kamelet resources
     Given Disable auto removal of Kubernetes resources
-    Given Camel-K resource polling configuration
+    Given Camel K resource polling configuration
       | maxAttempts          | 60   |
       | delayBetweenAttempts | 3000 |
 
   Scenario: Wait for binding to start
     Given create Kubernetes service probe-service with target port 8080
-    Then Camel-K integration insert-field-action-binding should be running
+    Then Camel K integration insert-field-action-binding should be running
 
   Scenario: Verify binding
     Given HTTP server "probe-service"
diff --git a/test/mail-sink/mail-sink.feature b/test/mail-sink/mail-sink.feature
index 489246a..bb889d6 100644
--- a/test/mail-sink/mail-sink.feature
+++ b/test/mail-sink/mail-sink.feature
@@ -1,7 +1,7 @@
 Feature: Mail Sink
 
   Background:
-    Given Camel-K resource polling configuration
+    Given Camel K resource polling configuration
       | maxAttempts          | 200   |
       | delayBetweenAttempts | 2000  |
     Given variables
@@ -17,11 +17,11 @@ Feature: Mail Sink
     Given load endpoint mail-server.groovy
     Given create Kubernetes service mail-server with port mapping 25:22222
 
-  Scenario: Create Camel-K resources
+  Scenario: Create Camel K resources
     Given Kamelet mail-sink is available
     Given Kamelet timer-source is available
     Given load KameletBinding timer-to-mail.yaml
-    And Camel-K integration timer-to-mail should be running
+    And Camel K integration timer-to-mail should be running
 
   Scenario: Verify mail message sent
     Then endpoint mail-server should receive body
@@ -41,6 +41,6 @@ Feature: Mail Sink
     }
     """
 
-  Scenario: Remove Camel-K resources
+  Scenario: Remove Camel K resources
     Given delete KameletBinding timer-to-mail
     And delete Kubernetes service mail-server
diff --git a/test/mail-sink/yaks-config.yaml b/test/mail-sink/yaks-config.yaml
index c2e8d95..910c83a 100644
--- a/test/mail-sink/yaks-config.yaml
+++ b/test/mail-sink/yaks-config.yaml
@@ -16,6 +16,8 @@
 # ---------------------------------------------------------------------------
 
 config:
+  namespace:
+    temporary: true
   runtime:
     env:
       - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
diff --git a/test/timer-source/timer-source.feature 
b/test/timer-source/timer-source.feature
index 4284725..6917806 100644
--- a/test/timer-source/timer-source.feature
+++ b/test/timer-source/timer-source.feature
@@ -3,7 +3,7 @@ Feature: Timer Source Kamelet
   Background:
     Given Disable auto removal of Kamelet resources
     Given Disable auto removal of Kubernetes resources
-    Given Camel-K resource polling configuration
+    Given Camel K resource polling configuration
       | maxAttempts          | 20   |
       | delayBetweenAttempts | 1000 |
 

Reply via email to