This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit a0406cc7a0587decd5eba2fe8f39a5ee5b0302e8 Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Wed Nov 30 22:02:13 2022 +0100 Revert existing Kamelets to not use data type converter - AWS S3 source Kamelet - AWS DDB sink Kamelet - JsonToDdbModelConverter utility and unit tests --- kamelets/aws-ddb-sink.kamelet.yaml | 25 +-- kamelets/aws-s3-source.kamelet.yaml | 19 -- .../transform/aws/ddb/JsonToDdbModelConverter.java | 201 +++++++++++++++++++++ .../aws/ddb/JsonToDdbModelConverterTest.java | 184 +++++++++++++++++++ .../resources/kamelets/aws-ddb-sink.kamelet.yaml | 25 +-- .../resources/kamelets/aws-s3-source.kamelet.yaml | 19 -- 6 files changed, 395 insertions(+), 78 deletions(-) diff --git a/kamelets/aws-ddb-sink.kamelet.yaml b/kamelets/aws-ddb-sink.kamelet.yaml index 87b338ee..5b603abf 100644 --- a/kamelets/aws-ddb-sink.kamelet.yaml +++ b/kamelets/aws-ddb-sink.kamelet.yaml @@ -97,12 +97,6 @@ spec: x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' default: false - inputFormat: - title: Input Type - description: Specify the input type for this Kamelet. The Kamelet will automatically apply conversion logic in order to transform message content to this data type. - type: string - default: json - example: json types: in: mediaType: application/json @@ -113,26 +107,17 @@ spec: - "camel:aws2-ddb" - "camel:kamelet" template: - beans: - - name: dataTypeRegistry - type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" - - name: inputTypeProcessor - type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" - property: - - key: scheme - value: 'aws2-ddb' - - key: format - value: '{{inputFormat}}' - - key: registry - value: '#bean:{{dataTypeRegistry}}' from: uri: "kamelet:source" steps: - set-property: name: operation constant: "{{operation}}" - - process: - ref: "{{inputTypeProcessor}}" + - unmarshal: + json: + library: Jackson + unmarshalType: com.fasterxml.jackson.databind.JsonNode + - bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter" - to: uri: "aws2-ddb:{{table}}" parameters: diff --git a/kamelets/aws-s3-source.kamelet.yaml b/kamelets/aws-s3-source.kamelet.yaml index d937f6e5..6ab2bca4 100644 --- a/kamelets/aws-s3-source.kamelet.yaml +++ b/kamelets/aws-s3-source.kamelet.yaml @@ -107,12 +107,6 @@ spec: description: The number of milliseconds before the next poll of the selected bucket. type: integer default: 500 - outputFormat: - title: Output Type - description: Choose the output type for this Kamelet. The Kamelet supports different output types and performs automatic message conversion according to this data type. - type: string - default: binary - example: binary dependencies: - "camel:core" - "camel:aws2-s3" @@ -120,17 +114,6 @@ spec: - "camel:kamelet" template: beans: - - name: dataTypeRegistry - type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" - - name: outputTypeProcessor - type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" - property: - - key: scheme - value: 'aws2-s3' - - key: format - value: '{{outputFormat}}' - - key: registry - value: '#bean:{{dataTypeRegistry}}' - name: renameHeaders type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders" property: @@ -160,6 +143,4 @@ spec: steps: - process: ref: "{{renameHeaders}}" - - process: - ref: "{{outputTypeProcessor}}" - to: "kamelet:sink" diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java new file mode 100644 index 00000000..2a203ed0 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kamelets.utils.transform.aws.ddb; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.component.aws2.ddb.Ddb2Constants; +import org.apache.camel.component.aws2.ddb.Ddb2Operations; +import software.amazon.awssdk.services.dynamodb.model.AttributeAction; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; + +/** + * Maps Json body to DynamoDB attribute value map and sets the attribute map as Camel DynamoDB header entries. + * + * Json property names map to attribute keys and Json property values map to attribute values. + * + * During mapping the Json property types resolve to the respective attribute types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). + * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code NumberSet} attribute values. + * + * For PutItem operation the Json body defines all item attributes. + * + * For DeleteItem operation the Json body defines only the primary key attributes that identify the item to delete. + * + * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item attributes tht get updated on the item. + * + * The given Json body can use "key" and "item" as top level properties. + * Both define a Json object that will be mapped to respective attribute value maps: + * <pre>{@code + * { + * "key": {}, + * "item": {} + * } + * } + * </pre> + * The converter will extract the objects and set respective attribute value maps as header entries. + * This is a comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation. + * + * In case key and item attribute value maps are identical you can omit the special top level properties completely. + * The converter will map the whole Json body as is then and use it as source for the attribute value map. + */ +public class JsonToDdbModelConverter { + + public String process(@ExchangeProperty("operation") String operation, Exchange exchange) throws InvalidPayloadException { + if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) || + exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) { + return ""; + } + + ObjectMapper mapper = new ObjectMapper(); + + JsonNode jsonBody = exchange.getMessage().getMandatoryBody(JsonNode.class); + + JsonNode key = jsonBody.get("key"); + JsonNode item = jsonBody.get("item"); + + Map<String, Object> keyProps; + if (key != null) { + keyProps = mapper.convertValue(key, new TypeReference<Map<String, Object>>(){}); + } else { + keyProps = mapper.convertValue(jsonBody, new TypeReference<Map<String, Object>>(){}); + } + + Map<String, Object> itemProps; + if (item != null) { + itemProps = mapper.convertValue(item, new TypeReference<Map<String, Object>>(){}); + } else { + itemProps = keyProps; + } + + final Map<String, AttributeValue> keyMap = getAttributeValueMap(keyProps); + + switch (Ddb2Operations.valueOf(operation)) { + case PutItem: + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.PutItem); + exchange.getMessage().setHeader(Ddb2Constants.ITEM, getAttributeValueMap(itemProps)); + setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, ReturnValue.ALL_OLD.toString(), exchange); + break; + case UpdateItem: + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.UpdateItem); + exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap); + exchange.getMessage().setHeader(Ddb2Constants.UPDATE_VALUES, getAttributeValueUpdateMap(itemProps)); + setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, ReturnValue.ALL_NEW.toString(), exchange); + break; + case DeleteItem: + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.DeleteItem); + exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap); + setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, ReturnValue.ALL_OLD.toString(), exchange); + break; + default: + throw new UnsupportedOperationException(String.format("Unsupported operation '%s'", operation)); + } + + return ""; + } + + private void setHeaderIfNotPresent(String headerName, Object value, Exchange exchange) { + exchange.getMessage().setHeader(headerName, value); + } + + private Map<String, AttributeValue> getAttributeValueMap(Map<String, Object> body) { + final Map<String, AttributeValue> attributeValueMap = new LinkedHashMap<>(); + + for (Map.Entry<String, Object> attribute : body.entrySet()) { + attributeValueMap.put(attribute.getKey(), getAttributeValue(attribute.getValue())); + } + + return attributeValueMap; + } + + private Map<String, AttributeValueUpdate> getAttributeValueUpdateMap(Map<String, Object> body) { + final Map<String, AttributeValueUpdate> attributeValueMap = new LinkedHashMap<>(); + + for (Map.Entry<String, Object> attribute : body.entrySet()) { + attributeValueMap.put(attribute.getKey(), getAttributeValueUpdate(attribute.getValue())); + } + + return attributeValueMap; + } + + private static AttributeValue getAttributeValue(Object value) { + if (value == null) { + return AttributeValue.builder().nul(true).build(); + } + + if (value instanceof String) { + return AttributeValue.builder().s(value.toString()).build(); + } + + if (value instanceof Integer) { + return AttributeValue.builder().n(value.toString()).build(); + } + + if (value instanceof Boolean) { + return AttributeValue.builder().bool((Boolean) value).build(); + } + + if (value instanceof String[]) { + return AttributeValue.builder().ss((String[]) value).build(); + } + + if (value instanceof int[]) { + return AttributeValue.builder().ns(Stream.of((int[]) value).map(Object::toString).collect(Collectors.toList())).build(); + } + + if (value instanceof List) { + List<?> values = ((List<?>) value); + + if (values.isEmpty()) { + return AttributeValue.builder().ss().build(); + } else if (values.get(0) instanceof Integer) { + return AttributeValue.builder().ns(values.stream().map(Object::toString).collect(Collectors.toList())).build(); + } else { + return AttributeValue.builder().ss(values.stream().map(Object::toString).collect(Collectors.toList())).build(); + } + } + + if (value instanceof Map) { + Map<String, AttributeValue> nestedAttributes = new LinkedHashMap<>(); + + for (Map.Entry<?, ?> nested : ((Map<?, ?>) value).entrySet()) { + nestedAttributes.put(nested.getKey().toString(), getAttributeValue(nested.getValue())); + } + + return AttributeValue.builder().m(nestedAttributes).build(); + } + + return AttributeValue.builder().s(value.toString()).build(); + } + + private static AttributeValueUpdate getAttributeValueUpdate(Object value) { + return AttributeValueUpdate.builder() + .action(AttributeAction.PUT) + .value(getAttributeValue(value)).build(); + } +} \ No newline at end of file diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java new file mode 100644 index 00000000..e88dce4e --- /dev/null +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kamelets.utils.transform.aws.ddb; + +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.component.aws2.ddb.Ddb2Constants; +import org.apache.camel.component.aws2.ddb.Ddb2Operations; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.AttributeAction; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; + +class JsonToDdbModelConverterTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private final JsonToDdbModelConverter processor = new JsonToDdbModelConverter(); + + private final String keyJson = "{" + + "\"name\": \"Rajesh Koothrappali\"" + + "}"; + + private final String itemJson = "{" + + "\"name\": \"Rajesh Koothrappali\"," + + "\"age\": 29," + + "\"super-heroes\": [\"batman\", \"spiderman\", \"wonderwoman\"]," + + "\"issues\": [5, 3, 9, 1]," + + "\"girlfriend\": null," + + "\"doctorate\": true" + + "}"; + + @BeforeEach + void setup() { + this.camelContext = new DefaultCamelContext(); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapPutItemHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(itemJson)); + + processor.process(Ddb2Operations.PutItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + assertAttributeValueMap(exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class)); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapUpdateItemHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + ", \"item\": " + itemJson + "}")); + + processor.process(Ddb2Operations.UpdateItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.UpdateItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_NEW.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map<String, AttributeValue> attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class); + Assertions.assertEquals(1L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); + + assertAttributeValueUpdateMap(exchange.getMessage().getHeader(Ddb2Constants.UPDATE_VALUES, Map.class)); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapDeleteItemHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + "}")); + + processor.process(Ddb2Operations.DeleteItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.DeleteItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map<String, AttributeValue> attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class); + Assertions.assertEquals(1L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapNestedObjects() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson + "}")); + + processor.process(Ddb2Operations.PutItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map<String, AttributeValue> attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class); + Assertions.assertEquals(1L, attributeValueMap.size()); + + Assertions.assertEquals("AttributeValue(M={name=AttributeValue(S=Rajesh Koothrappali), " + + "age=AttributeValue(N=29), " + + "super-heroes=AttributeValue(SS=[batman, spiderman, wonderwoman]), " + + "issues=AttributeValue(NS=[5, 3, 9, 1]), " + + "girlfriend=AttributeValue(NUL=true), " + + "doctorate=AttributeValue(BOOL=true)})", attributeValueMap.get("user").toString()); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapEmptyJson() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{}")); + + processor.process(Ddb2Operations.PutItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map<String, AttributeValue> attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class); + Assertions.assertEquals(0L, attributeValueMap.size()); + } + + @Test() + void shouldFailForUnsupportedOperation() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{}")); + + Assertions.assertThrows(UnsupportedOperationException.class, () -> processor.process(Ddb2Operations.BatchGetItems.name(), exchange)); + } + + private void assertAttributeValueMap(Map<String, AttributeValue> attributeValueMap) { + Assertions.assertEquals(6L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); + Assertions.assertEquals(AttributeValue.builder().n("29").build(), attributeValueMap.get("age")); + Assertions.assertEquals(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build(), attributeValueMap.get("super-heroes")); + Assertions.assertEquals(AttributeValue.builder().ns("5", "3", "9", "1").build(), attributeValueMap.get("issues")); + Assertions.assertEquals(AttributeValue.builder().nul(true).build(), attributeValueMap.get("girlfriend")); + Assertions.assertEquals(AttributeValue.builder().bool(true).build(), attributeValueMap.get("doctorate")); + } + + private void assertAttributeValueUpdateMap(Map<String, AttributeValueUpdate> attributeValueMap) { + Assertions.assertEquals(6L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh Koothrappali").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("name")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("age")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("super-heroes")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5", "3", "9", "1").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("issues")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()).action(AttributeAction.PUT).build(), attributeValueMap.get("girlfriend")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()).action(AttributeAction.PUT).build(), attributeValueMap.get("doctorate")); + } +} \ No newline at end of file diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml index 87b338ee..5b603abf 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml @@ -97,12 +97,6 @@ spec: x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' default: false - inputFormat: - title: Input Type - description: Specify the input type for this Kamelet. The Kamelet will automatically apply conversion logic in order to transform message content to this data type. - type: string - default: json - example: json types: in: mediaType: application/json @@ -113,26 +107,17 @@ spec: - "camel:aws2-ddb" - "camel:kamelet" template: - beans: - - name: dataTypeRegistry - type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" - - name: inputTypeProcessor - type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" - property: - - key: scheme - value: 'aws2-ddb' - - key: format - value: '{{inputFormat}}' - - key: registry - value: '#bean:{{dataTypeRegistry}}' from: uri: "kamelet:source" steps: - set-property: name: operation constant: "{{operation}}" - - process: - ref: "{{inputTypeProcessor}}" + - unmarshal: + json: + library: Jackson + unmarshalType: com.fasterxml.jackson.databind.JsonNode + - bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter" - to: uri: "aws2-ddb:{{table}}" parameters: diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml index d937f6e5..6ab2bca4 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml @@ -107,12 +107,6 @@ spec: description: The number of milliseconds before the next poll of the selected bucket. type: integer default: 500 - outputFormat: - title: Output Type - description: Choose the output type for this Kamelet. The Kamelet supports different output types and performs automatic message conversion according to this data type. - type: string - default: binary - example: binary dependencies: - "camel:core" - "camel:aws2-s3" @@ -120,17 +114,6 @@ spec: - "camel:kamelet" template: beans: - - name: dataTypeRegistry - type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" - - name: outputTypeProcessor - type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" - property: - - key: scheme - value: 'aws2-s3' - - key: format - value: '{{outputFormat}}' - - key: registry - value: '#bean:{{dataTypeRegistry}}' - name: renameHeaders type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders" property: @@ -160,6 +143,4 @@ spec: steps: - process: ref: "{{renameHeaders}}" - - process: - ref: "{{outputTypeProcessor}}" - to: "kamelet:sink"