This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-3.11.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit e99f81cd233f81eece7a0f56e7e2c40b9d9f3d81 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Nov 4 16:33:02 2021 +0100 CAMEL-17167 - Camel-AWS2-SQS: Message attributes can be at most 10 --- .../camel/component/aws2/sqs/Sqs2Producer.java | 95 ++++++++++++---------- .../SqsProducerSendLocalstackMaxAttributesIT.java | 70 ++++++++++++++++ 2 files changed, 120 insertions(+), 45 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java index e168517..710f495 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java @@ -56,6 +56,8 @@ public class Sqs2Producer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(Sqs2Producer.class); + private static final int MAX_ATTRIBUTES = 10; + private transient String sqsProducerToString; public Sqs2Producer(Sqs2Endpoint endpoint) { @@ -279,53 +281,56 @@ public class Sqs2Producer extends DefaultProducer { // only put the message header which is not filtered into the // message attribute if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) { - Object value = entry.getValue(); - if (value instanceof String && !((String) value).isEmpty()) { - MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); - mav.dataType("String"); - mav.stringValue((String) value); - result.put(entry.getKey(), mav.build()); - } else if (value instanceof ByteBuffer) { - MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); - mav.dataType("Binary"); - mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value)); - result.put(entry.getKey(), mav.build()); - } else if (value instanceof Boolean) { - MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); - mav.dataType("Number.Boolean"); - mav.stringValue(((Boolean) value) ? "1" : "0"); - result.put(entry.getKey(), mav.build()); - } else if (value instanceof Number) { - MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); - final String dataType; - if (value instanceof Integer) { - dataType = "Number.int"; - } else if (value instanceof Byte) { - dataType = "Number.byte"; - } else if (value instanceof Double) { - dataType = "Number.double"; - } else if (value instanceof Float) { - dataType = "Number.float"; - } else if (value instanceof Long) { - dataType = "Number.long"; - } else if (value instanceof Short) { - dataType = "Number.short"; + // We are going to put the first MAX_ATTRIBUTES headers, because this is the maximum Attributes an SQS Message could accept + if (result.size() < MAX_ATTRIBUTES) { + Object value = entry.getValue(); + if (value instanceof String && !((String) value).isEmpty()) { + MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); + mav.dataType("String"); + mav.stringValue((String) value); + result.put(entry.getKey(), mav.build()); + } else if (value instanceof ByteBuffer) { + MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); + mav.dataType("Binary"); + mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value)); + result.put(entry.getKey(), mav.build()); + } else if (value instanceof Boolean) { + MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); + mav.dataType("Number.Boolean"); + mav.stringValue(((Boolean) value) ? "1" : "0"); + result.put(entry.getKey(), mav.build()); + } else if (value instanceof Number) { + MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); + final String dataType; + if (value instanceof Integer) { + dataType = "Number.int"; + } else if (value instanceof Byte) { + dataType = "Number.byte"; + } else if (value instanceof Double) { + dataType = "Number.double"; + } else if (value instanceof Float) { + dataType = "Number.float"; + } else if (value instanceof Long) { + dataType = "Number.long"; + } else if (value instanceof Short) { + dataType = "Number.short"; + } else { + dataType = "Number"; + } + mav.dataType(dataType); + mav.stringValue(value.toString()); + result.put(entry.getKey(), mav.build()); + } else if (value instanceof Date) { + MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); + mav.dataType("String"); + mav.stringValue(value.toString()); + result.put(entry.getKey(), mav.build()); } else { - dataType = "Number"; + // cannot translate the message header to message attribute + // value + LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), + entry.getValue()); } - mav.dataType(dataType); - mav.stringValue(value.toString()); - result.put(entry.getKey(), mav.build()); - } else if (value instanceof Date) { - MessageAttributeValue.Builder mav = MessageAttributeValue.builder(); - mav.dataType("String"); - mav.stringValue(value.toString()); - result.put(entry.getKey(), mav.build()); - } else { - // cannot translate the message header to message attribute - // value - LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), - entry.getValue()); } } } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java new file mode 100644 index 0000000..544e18a --- /dev/null +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.sqs.integration; + +import org.apache.camel.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +public class SqsProducerSendLocalstackMaxAttributesIT extends Aws2SQSBaseTest { + + @EndpointInject("direct:start") + private ProducerTemplate template; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @Test + public void sendInOnly() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader("value1", "value1"); + exchange.getIn().setHeader("value2", "value2"); + exchange.getIn().setHeader("value3", "value3"); + exchange.getIn().setHeader("value4", "value4"); + exchange.getIn().setHeader("value5", "value5"); + exchange.getIn().setHeader("value6", "value6"); + exchange.getIn().setHeader("value7", "value7"); + exchange.getIn().setHeader("value8", "value8"); + exchange.getIn().setHeader("value9", "value9"); + exchange.getIn().setHeader("value10", "value10"); + exchange.getIn().setHeader("value11", "value11"); + exchange.getIn().setBody("Test"); + } + }); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").startupOrder(2) + .toF("aws2-sqs://%s?autoCreateQueue=true", sharedNameGenerator.getName()); + + fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", sharedNameGenerator.getName()) + .startupOrder(1).log("${body}").to("mock:result"); + } + }; + } +}