CAMEL-9603 Added support for Kinesis to handle producer requests.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b7ffeacd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b7ffeacd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b7ffeacd Branch: refs/heads/master Commit: b7ffeacdfb43fe5ae015568345e251151730dc44 Parents: 1d0a97c Author: John D. Ament <johndam...@apache.org> Authored: Sun Feb 14 17:01:16 2016 -0500 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Feb 15 09:27:23 2016 +0100 ---------------------------------------------------------------------- .../camel-aws/src/main/docs/aws-kinesis.adoc | 4 +- .../component/aws/common/AwsExchangeUtil.java | 31 +++++ .../component/aws/ddb/AbstractDdbCommand.java | 8 +- .../camel/component/aws/ec2/EC2Producer.java | 11 +- .../component/aws/kinesis/KinesisEndpoint.java | 25 +++- .../component/aws/kinesis/KinesisProducer.java | 72 ++++++++++++ .../camel/component/aws/s3/S3Producer.java | 12 +- .../component/aws/sdb/AbstractSdbCommand.java | 9 -- .../aws/sdb/DomainMetadataCommand.java | 2 + .../component/aws/sdb/GetAttributesCommand.java | 2 + .../component/aws/sdb/ListDomainsCommand.java | 2 + .../camel/component/aws/sdb/SelectCommand.java | 2 + .../camel/component/aws/ses/SesProducer.java | 11 +- .../camel/component/aws/sns/SnsProducer.java | 12 +- .../camel/component/aws/sqs/SqsProducer.java | 12 +- .../aws/common/AwsExchangeUtilTest.java | 44 +++++++ .../aws/kinesis/KinesisProducerTest.java | 117 +++++++++++++++++++ .../aws/sdb/AbstractSdbCommandTest.java | 9 -- 18 files changed, 307 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/docs/aws-kinesis.adoc ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc b/components/camel-aws/src/main/docs/aws-kinesis.adoc index bd38602..5f0c28b 100644 --- a/components/camel-aws/src/main/docs/aws-kinesis.adoc +++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc @@ -2,7 +2,7 @@ Kinesis Component ~~~~~~~~~~~~~~~~~ -*Available as of Camel 2.7* +*Available as of Camel 2.17* The Kinesis component supports receiving messages from Amazon Kinesis service. @@ -132,7 +132,7 @@ Maven users will need to add the following dependency to their pom.xml. --------------------------------------- where `${camel-version`} must be replaced by the actual version of Camel -(2.7 or higher). +(2.17 or higher). [[AWS-KINESIS-SeeAlso]] See Also http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java new file mode 100644 index 0000000..88abfba --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.common; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +public class AwsExchangeUtil { + public static Message getMessageForResponse(final Exchange exchange) { + if (exchange.getPattern().isOutCapable()) { + Message out = exchange.getOut(); + out.copyFrom(exchange.getIn()); + return out; + } + return exchange.getIn(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java index ace1ee5..12b06ee 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java @@ -25,6 +25,7 @@ import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.component.aws.common.AwsExchangeUtil; public abstract class AbstractDdbCommand { protected DdbConfiguration configuration; @@ -43,12 +44,7 @@ public abstract class AbstractDdbCommand { public abstract void execute(); protected Message getMessageForResponse(Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - return exchange.getIn(); + return AwsExchangeUtil.getMessageForResponse(exchange); } protected String determineTableName() { http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java index e4e1286..a021799 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java @@ -50,6 +50,8 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + /** * A Producer which sends messages to the Amazon EC2 Service * <a href="http://aws.amazon.com/ec2/">AWS EC2</a> @@ -122,15 +124,6 @@ public class EC2Producer extends DefaultProducer { public EC2Endpoint getEndpoint() { return (EC2Endpoint) super.getEndpoint(); } - - private Message getMessageForResponse(final Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - return exchange.getIn(); - } private void createAndRunInstance(AmazonEC2Client ec2Client, Exchange exchange) { String ami; http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java index befcbaa..fdf1bdd 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -24,6 +24,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -32,8 +34,8 @@ import org.apache.camel.spi.UriPath; /** * The aws-kinesis component is for consuming records from Amazon Kinesis Streams. */ -@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerOnly = true, consumerClass = KinesisConsumer.class, label = "cloud,messaging") -public class KinesisEndpoint extends ScheduledPollEndpoint { +@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = "cloud,messaging") +public class KinesisEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware { @UriPath(label = "consumer", description = "Name of the stream") @Metadata(required = "true") @@ -50,6 +52,9 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @UriParam(label = "consumer", description = "Defines where in the Kinesis stream to start getting records") private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; + @UriParam + private HeaderFilterStrategy headerFilterStrategy; + public KinesisEndpoint(String uri, String streamName, KinesisComponent component) { super(uri, component); this.streamName = streamName; @@ -57,7 +62,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @Override public Producer createProducer() throws Exception { - throw new UnsupportedOperationException("Not supported yet."); + return new KinesisProducer(this); } @Override @@ -120,8 +125,20 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { } @Override + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + + @Override + /** + * To use a custom HeaderFilterStrategy to map headers to/from Camel. + */ + public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { + this.headerFilterStrategy = headerFilterStrategy; + } + + @Override public String toString() { return "KinesisEndpoint{amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}'; } - } http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java new file mode 100644 index 0000000..4ecc1f4 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.NoFactoryAvailableException; +import org.apache.camel.impl.DefaultProducer; + +import java.nio.ByteBuffer; + +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + +public class KinesisProducer extends DefaultProducer { + public KinesisProducer(KinesisEndpoint endpoint) throws NoFactoryAvailableException { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + PutRecordRequest request = createRequest(exchange); + PutRecordResult putRecordResult = getEndpoint().getClient().putRecord(request); + Message message = getMessageForResponse(exchange); + message.setHeader(KinesisConstants.SEQUENCE_NUMBER,putRecordResult.getSequenceNumber()); + message.setHeader(KinesisConstants.SHARD_ID, putRecordResult.getShardId()); + } + + @Override + public KinesisEndpoint getEndpoint() { + return (KinesisEndpoint) super.getEndpoint(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("KinesisProducer{"); + sb.append(super.getEndpoint().getEndpointUri()); + sb.append('}'); + return sb.toString(); + } + + private PutRecordRequest createRequest(Exchange exchange) { + ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class); + Object partitionKey = exchange.getIn().getHeader(KinesisConstants.PARTITION_KEY); + Object sequenceNumber = exchange.getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER); + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setData(body); + putRecordRequest.setStreamName(getEndpoint().getStreamName()); + if(sequenceNumber != null) { + putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString()); + } + if(partitionKey != null) { + putRecordRequest.setPartitionKey(partitionKey.toString()); + } + return putRecordRequest; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java index 191d144..0b97f99 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java @@ -50,6 +50,8 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + /** * A Producer which sends messages to the Amazon Web Service Simple Storage Service <a * href="http://aws.amazon.com/s3/">AWS S3</a> @@ -299,16 +301,6 @@ public class S3Producer extends DefaultProducer { return storageClass; } - private Message getMessageForResponse(final Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - - return exchange.getIn(); - } - protected S3Configuration getConfiguration() { return getEndpoint().getConfiguration(); } http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java index 88db821..74d392d 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java @@ -39,15 +39,6 @@ public abstract class AbstractSdbCommand { } public abstract void execute(); - - protected Message getMessageForResponse(Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - return exchange.getIn(); - } protected String determineDomainName() { String domainName = exchange.getIn().getHeader(SdbConstants.DOMAIN_NAME, String.class); http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java index 625c17e..69755f8 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java @@ -23,6 +23,8 @@ import com.amazonaws.services.simpledb.model.DomainMetadataResult; import org.apache.camel.Exchange; import org.apache.camel.Message; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + public class DomainMetadataCommand extends AbstractSdbCommand { public DomainMetadataCommand(AmazonSimpleDB sdbClient, SdbConfiguration configuration, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java index 3490d7e..5528610 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java @@ -25,6 +25,8 @@ import com.amazonaws.services.simpledb.model.GetAttributesResult; import org.apache.camel.Exchange; import org.apache.camel.Message; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + public class GetAttributesCommand extends AbstractSdbCommand { public GetAttributesCommand(AmazonSimpleDB sdbClient, SdbConfiguration configuration, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java index dbaf167..8c79f3f 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java @@ -23,6 +23,8 @@ import com.amazonaws.services.simpledb.model.ListDomainsResult; import org.apache.camel.Exchange; import org.apache.camel.Message; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + public class ListDomainsCommand extends AbstractSdbCommand { public ListDomainsCommand(AmazonSimpleDB sdbClient, SdbConfiguration configuration, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java index 3fc860d..ab158a1 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java @@ -23,6 +23,8 @@ import com.amazonaws.services.simpledb.model.SelectResult; import org.apache.camel.Exchange; import org.apache.camel.Message; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + public class SelectCommand extends AbstractSdbCommand { public SelectCommand(AmazonSimpleDB sdbClient, SdbConfiguration configuration, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java index 3473873..874b1ec 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java @@ -36,6 +36,8 @@ import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.URISupport; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + /** * A Producer which sends messages to the Amazon Simple Email Service * <a href="http://aws.amazon.com/ses/">AWS SES</a> @@ -164,15 +166,6 @@ public class SesProducer extends DefaultProducer { return subject; } - private Message getMessageForResponse(Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - return exchange.getIn(); - } - protected SesConfiguration getConfiguration() { return getEndpoint().getConfiguration(); } http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java index 1619e38..5781155 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java @@ -27,6 +27,8 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + /** * A Producer which sends messages to the Amazon Web Service Simple Notification Service @@ -59,16 +61,6 @@ public class SnsProducer extends DefaultProducer { Message message = getMessageForResponse(exchange); message.setHeader(SnsConstants.MESSAGE_ID, result.getMessageId()); } - - private Message getMessageForResponse(Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - - return exchange.getIn(); - } private String determineSubject(Exchange exchange) { String subject = exchange.getIn().getHeader(SnsConstants.SUBJECT, String.class); http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java index acda4f6..682d75e 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java @@ -34,6 +34,8 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + /** * A Producer which sends messages to the Amazon Web Service Simple Queue Service * <a href="http://aws.amazon.com/sqs/">AWS SQS</a> @@ -79,16 +81,6 @@ public class SqsProducer extends DefaultProducer { LOG.trace("found delay: " + delayValue); request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) : delayValue); } - - private Message getMessageForResponse(Exchange exchange) { - if (exchange.getPattern().isOutCapable()) { - Message out = exchange.getOut(); - out.copyFrom(exchange.getIn()); - return out; - } - - return exchange.getIn(); - } protected AmazonSQS getClient() { return getEndpoint().getClient(); http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java new file mode 100644 index 0000000..5ff3853 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.common; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultExchange; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertSame; + +public class AwsExchangeUtilTest { + private Exchange exchange; + + @Before + public void setUp() { + exchange = new DefaultExchange(new DefaultCamelContext()); + } + + @Test + public void getMessageForResponse() { + assertSame(exchange.getIn(), AwsExchangeUtil.getMessageForResponse(exchange)); + + exchange.setPattern(ExchangePattern.InOut); + + assertSame(exchange.getOut(), AwsExchangeUtil.getMessageForResponse(exchange)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java new file mode 100644 index 0000000..b636522 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisProducerTest { + private static final String SHARD_ID = "SHARD145"; + private static final String SEQUENCE_NUMBER = "SEQ123"; + private static final String STREAM_NAME = "streams"; + private static final String SAMPLE_RECORD_BODY = "SAMPLE"; + private static final ByteBuffer SAMPLE_BUFFER = ByteBuffer.wrap(SAMPLE_RECORD_BODY.getBytes()); + + @Mock + private AmazonKinesis kinesisClient; + @Mock + private KinesisEndpoint kinesisEndpoint; + @Mock + private Message outMessage; + @Mock + private Message inMessage; + @Mock + private PutRecordResult putRecordResult; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Exchange exchange; + + private KinesisProducer kinesisProducer; + @Before + public void setup() throws Exception { + when(kinesisEndpoint.getClient()).thenReturn(kinesisClient); + when(kinesisEndpoint.getEndpointUri()).thenReturn("kinesis://etl"); + when(kinesisEndpoint.getStreamName()).thenReturn(STREAM_NAME); + + when(exchange.getOut()).thenReturn(outMessage); + when(exchange.getIn()).thenReturn(inMessage); + when(exchange.getPattern()).thenReturn(ExchangePattern.InOut); + + when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER); + + when(putRecordResult.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER); + when(putRecordResult.getShardId()).thenReturn(SHARD_ID); + + when(kinesisClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult); + + kinesisProducer = new KinesisProducer(kinesisEndpoint); + } + + @Test + public void shouldPutRecordInRightStreamWhenProcessingExchange() throws Exception{ + kinesisProducer.process(exchange); + + ArgumentCaptor<PutRecordRequest> capture = ArgumentCaptor.forClass(PutRecordRequest.class); + verify(kinesisClient).putRecord(capture.capture()); + PutRecordRequest request = capture.getValue(); + ByteBuffer byteBuffer = request.getData(); + byte[] actualArray = byteBuffer.array(); + byte[] sampleArray = SAMPLE_BUFFER.array(); + assertEquals(sampleArray, actualArray); + assertEquals(STREAM_NAME, request.getStreamName()); + } + + @Test + public void shouldHaveProperHeadersWhenSending() throws Exception { + String partitionKey = "partition"; + String seqNoForOrdering = "1851"; + when(inMessage.getHeader(KinesisConstants.SEQUENCE_NUMBER)).thenReturn(seqNoForOrdering); + when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(partitionKey); + + kinesisProducer.process(exchange); + + ArgumentCaptor<PutRecordRequest> capture = ArgumentCaptor.forClass(PutRecordRequest.class); + verify(kinesisClient).putRecord(capture.capture()); + PutRecordRequest request = capture.getValue(); + + assertEquals(partitionKey, request.getPartitionKey()); + assertEquals(seqNoForOrdering, request.getSequenceNumberForOrdering()); + verify(outMessage).setHeader(KinesisConstants.SEQUENCE_NUMBER, SEQUENCE_NUMBER); + verify(outMessage).setHeader(KinesisConstants.SHARD_ID, SHARD_ID); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java index 0181e8f..d48a814 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java @@ -55,15 +55,6 @@ public class AbstractSdbCommandTest { } @Test - public void getMessageForResponse() { - assertSame(exchange.getIn(), this.command.getMessageForResponse(exchange)); - - exchange.setPattern(ExchangePattern.InOut); - - assertSame(exchange.getOut(), this.command.getMessageForResponse(exchange)); - } - - @Test public void determineDomainName() { assertEquals("DOMAIN1", this.command.determineDomainName());