CAMEL-9603 Added support for Kinesis to handle producer requests.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b7ffeacd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b7ffeacd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b7ffeacd

Branch: refs/heads/master
Commit: b7ffeacdfb43fe5ae015568345e251151730dc44
Parents: 1d0a97c
Author: John D. Ament <johndam...@apache.org>
Authored: Sun Feb 14 17:01:16 2016 -0500
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Feb 15 09:27:23 2016 +0100

----------------------------------------------------------------------
 .../camel-aws/src/main/docs/aws-kinesis.adoc    |   4 +-
 .../component/aws/common/AwsExchangeUtil.java   |  31 +++++
 .../component/aws/ddb/AbstractDdbCommand.java   |   8 +-
 .../camel/component/aws/ec2/EC2Producer.java    |  11 +-
 .../component/aws/kinesis/KinesisEndpoint.java  |  25 +++-
 .../component/aws/kinesis/KinesisProducer.java  |  72 ++++++++++++
 .../camel/component/aws/s3/S3Producer.java      |  12 +-
 .../component/aws/sdb/AbstractSdbCommand.java   |   9 --
 .../aws/sdb/DomainMetadataCommand.java          |   2 +
 .../component/aws/sdb/GetAttributesCommand.java |   2 +
 .../component/aws/sdb/ListDomainsCommand.java   |   2 +
 .../camel/component/aws/sdb/SelectCommand.java  |   2 +
 .../camel/component/aws/ses/SesProducer.java    |  11 +-
 .../camel/component/aws/sns/SnsProducer.java    |  12 +-
 .../camel/component/aws/sqs/SqsProducer.java    |  12 +-
 .../aws/common/AwsExchangeUtilTest.java         |  44 +++++++
 .../aws/kinesis/KinesisProducerTest.java        | 117 +++++++++++++++++++
 .../aws/sdb/AbstractSdbCommandTest.java         |   9 --
 18 files changed, 307 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/docs/aws-kinesis.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc 
b/components/camel-aws/src/main/docs/aws-kinesis.adoc
index bd38602..5f0c28b 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc
@@ -2,7 +2,7 @@
 Kinesis Component
 ~~~~~~~~~~~~~~~~~
 
-*Available as of Camel 2.7*
+*Available as of Camel 2.17*
 
 The Kinesis component supports receiving messages from Amazon Kinesis
 service.
@@ -132,7 +132,7 @@ Maven users will need to add the following dependency to 
their pom.xml.
 ---------------------------------------
 
 where `${camel-version`} must be replaced by the actual version of Camel
-(2.7 or higher).
+(2.17 or higher).
 
 [[AWS-KINESIS-SeeAlso]]
 See Also

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java
new file mode 100644
index 0000000..88abfba
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java
@@ -0,0 +1,31 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.camel.component.aws.common;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+public class AwsExchangeUtil {
+    public static Message getMessageForResponse(final Exchange exchange) {
+        if (exchange.getPattern().isOutCapable()) {
+            Message out = exchange.getOut();
+            out.copyFrom(exchange.getIn());
+            return out;
+        }
+        return exchange.getIn();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java
index ace1ee5..12b06ee 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddb/AbstractDdbCommand.java
@@ -25,6 +25,7 @@ import 
com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.component.aws.common.AwsExchangeUtil;
 
 public abstract class AbstractDdbCommand {
     protected DdbConfiguration configuration;
@@ -43,12 +44,7 @@ public abstract class AbstractDdbCommand {
     public abstract void execute();
 
     protected Message getMessageForResponse(Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        return exchange.getIn();
+        return AwsExchangeUtil.getMessageForResponse(exchange);
     }
 
     protected String determineTableName() {

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java
index e4e1286..a021799 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ec2/EC2Producer.java
@@ -50,6 +50,8 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 /**
  * A Producer which sends messages to the Amazon EC2 Service
  * <a href="http://aws.amazon.com/ec2/";>AWS EC2</a>
@@ -122,15 +124,6 @@ public class EC2Producer extends DefaultProducer {
     public EC2Endpoint getEndpoint() {
         return (EC2Endpoint) super.getEndpoint();
     }
-
-    private Message getMessageForResponse(final Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        return exchange.getIn();
-    }
     
     private void createAndRunInstance(AmazonEC2Client ec2Client, Exchange 
exchange) {
         String ami;

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index befcbaa..fdf1bdd 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -24,6 +24,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -32,8 +34,8 @@ import org.apache.camel.spi.UriPath;
 /**
  * The aws-kinesis component is for consuming records from Amazon Kinesis 
Streams.
  */
-@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = 
"aws-kinesis:streamName", consumerOnly = true, consumerClass = 
KinesisConsumer.class, label = "cloud,messaging")
-public class KinesisEndpoint extends ScheduledPollEndpoint {
+@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = 
"aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = 
"cloud,messaging")
+public class KinesisEndpoint extends ScheduledPollEndpoint implements 
HeaderFilterStrategyAware {
 
     @UriPath(label = "consumer", description = "Name of the stream")
     @Metadata(required = "true")
@@ -50,6 +52,9 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
     @UriParam(label = "consumer", description = "Defines where in the Kinesis 
stream to start getting records")
     private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
 
+    @UriParam
+    private HeaderFilterStrategy headerFilterStrategy;
+
     public KinesisEndpoint(String uri, String streamName, KinesisComponent 
component) {
         super(uri, component);
         this.streamName = streamName;
@@ -57,7 +62,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        throw new UnsupportedOperationException("Not supported yet.");
+        return new KinesisProducer(this);
     }
 
     @Override
@@ -120,8 +125,20 @@ public class KinesisEndpoint extends ScheduledPollEndpoint 
{
     }
 
     @Override
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy;
+    }
+
+    @Override
+    /**
+     * To use a custom HeaderFilterStrategy to map headers to/from Camel.
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy 
headerFilterStrategy) {
+        this.headerFilterStrategy = headerFilterStrategy;
+    }
+
+    @Override
     public String toString() {
         return "KinesisEndpoint{amazonKinesisClient=[redacted], 
maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + 
iteratorType + ", streamName=" + streamName + '}';
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
new file mode 100644
index 0000000..4ecc1f4
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.impl.DefaultProducer;
+
+import java.nio.ByteBuffer;
+
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
+public class KinesisProducer extends DefaultProducer {
+    public KinesisProducer(KinesisEndpoint endpoint) throws 
NoFactoryAvailableException {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        PutRecordRequest request = createRequest(exchange);
+        PutRecordResult putRecordResult = 
getEndpoint().getClient().putRecord(request);
+        Message message = getMessageForResponse(exchange);
+        
message.setHeader(KinesisConstants.SEQUENCE_NUMBER,putRecordResult.getSequenceNumber());
+        message.setHeader(KinesisConstants.SHARD_ID, 
putRecordResult.getShardId());
+    }
+
+    @Override
+    public KinesisEndpoint getEndpoint() {
+        return (KinesisEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("KinesisProducer{");
+        sb.append(super.getEndpoint().getEndpointUri());
+        sb.append('}');
+        return sb.toString();
+    }
+
+    private PutRecordRequest createRequest(Exchange exchange) {
+        ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class);
+        Object partitionKey = 
exchange.getIn().getHeader(KinesisConstants.PARTITION_KEY);
+        Object sequenceNumber = 
exchange.getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER);
+        PutRecordRequest putRecordRequest = new PutRecordRequest();
+        putRecordRequest.setData(body);
+        putRecordRequest.setStreamName(getEndpoint().getStreamName());
+        if(sequenceNumber != null) {
+            
putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString());
+        }
+        if(partitionKey != null) {
+            putRecordRequest.setPartitionKey(partitionKey.toString());
+        }
+        return putRecordRequest;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
index 191d144..0b97f99 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
@@ -50,6 +50,8 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 /**
  * A Producer which sends messages to the Amazon Web Service Simple Storage 
Service <a
  * href="http://aws.amazon.com/s3/";>AWS S3</a>
@@ -299,16 +301,6 @@ public class S3Producer extends DefaultProducer {
         return storageClass;
     }
 
-    private Message getMessageForResponse(final Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-
-        return exchange.getIn();
-    }
-
     protected S3Configuration getConfiguration() {
         return getEndpoint().getConfiguration();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java
index 88db821..74d392d 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/AbstractSdbCommand.java
@@ -39,15 +39,6 @@ public abstract class AbstractSdbCommand {
     }
 
     public abstract void execute();
-    
-    protected Message getMessageForResponse(Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        return exchange.getIn();
-    }
 
     protected String determineDomainName() {
         String domainName = 
exchange.getIn().getHeader(SdbConstants.DOMAIN_NAME, String.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java
index 625c17e..69755f8 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/DomainMetadataCommand.java
@@ -23,6 +23,8 @@ import 
com.amazonaws.services.simpledb.model.DomainMetadataResult;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 public class DomainMetadataCommand extends AbstractSdbCommand {
 
     public DomainMetadataCommand(AmazonSimpleDB sdbClient, SdbConfiguration 
configuration, Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java
index 3490d7e..5528610 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/GetAttributesCommand.java
@@ -25,6 +25,8 @@ import 
com.amazonaws.services.simpledb.model.GetAttributesResult;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 public class GetAttributesCommand extends AbstractSdbCommand {
     
     public GetAttributesCommand(AmazonSimpleDB sdbClient, SdbConfiguration 
configuration, Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java
index dbaf167..8c79f3f 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/ListDomainsCommand.java
@@ -23,6 +23,8 @@ import 
com.amazonaws.services.simpledb.model.ListDomainsResult;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 public class ListDomainsCommand extends AbstractSdbCommand {
 
     public ListDomainsCommand(AmazonSimpleDB sdbClient, SdbConfiguration 
configuration, Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java
index 3fc860d..ab158a1 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sdb/SelectCommand.java
@@ -23,6 +23,8 @@ import com.amazonaws.services.simpledb.model.SelectResult;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 public class SelectCommand extends AbstractSdbCommand {
 
     public SelectCommand(AmazonSimpleDB sdbClient, SdbConfiguration 
configuration, Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java
index 3473873..874b1ec 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ses/SesProducer.java
@@ -36,6 +36,8 @@ import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.URISupport;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 /**
  * A Producer which sends messages to the Amazon Simple Email Service
  * <a href="http://aws.amazon.com/ses/";>AWS SES</a>
@@ -164,15 +166,6 @@ public class SesProducer extends DefaultProducer {
         return subject;
     }
 
-    private Message getMessageForResponse(Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        return exchange.getIn();
-    }
-
     protected SesConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 1619e38..5781155 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -27,6 +27,8 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 
 /**
  * A Producer which sends messages to the Amazon Web Service Simple 
Notification Service
@@ -59,16 +61,6 @@ public class SnsProducer extends DefaultProducer {
         Message message = getMessageForResponse(exchange);
         message.setHeader(SnsConstants.MESSAGE_ID, result.getMessageId());
     }
-    
-    private Message getMessageForResponse(Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        
-        return exchange.getIn();
-    }
 
     private String determineSubject(Exchange exchange) {
         String subject = exchange.getIn().getHeader(SnsConstants.SUBJECT, 
String.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index acda4f6..682d75e 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -34,6 +34,8 @@ import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 /**
  * A Producer which sends messages to the Amazon Web Service Simple Queue 
Service
  * <a href="http://aws.amazon.com/sqs/";>AWS SQS</a>
@@ -79,16 +81,6 @@ public class SqsProducer extends DefaultProducer {
         LOG.trace("found delay: " + delayValue);
         request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) : 
delayValue);
     }
-
-    private Message getMessageForResponse(Exchange exchange) {
-        if (exchange.getPattern().isOutCapable()) {
-            Message out = exchange.getOut();
-            out.copyFrom(exchange.getIn());
-            return out;
-        }
-        
-        return exchange.getIn();
-    }
     
     protected AmazonSQS getClient() {
         return getEndpoint().getClient();

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java
new file mode 100644
index 0000000..5ff3853
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/common/AwsExchangeUtilTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.common;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertSame;
+
+public class AwsExchangeUtilTest {
+    private Exchange exchange;
+
+    @Before
+    public void setUp() {
+        exchange = new DefaultExchange(new DefaultCamelContext());
+    }
+
+    @Test
+    public void getMessageForResponse() {
+        assertSame(exchange.getIn(), 
AwsExchangeUtil.getMessageForResponse(exchange));
+
+        exchange.setPattern(ExchangePattern.InOut);
+
+        assertSame(exchange.getOut(), 
AwsExchangeUtil.getMessageForResponse(exchange));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
new file mode 100644
index 0000000..b636522
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisProducerTest {
+    private static final String SHARD_ID = "SHARD145";
+    private static final String SEQUENCE_NUMBER = "SEQ123";
+    private static final String STREAM_NAME = "streams";
+    private static final String SAMPLE_RECORD_BODY = "SAMPLE";
+    private static final ByteBuffer SAMPLE_BUFFER = 
ByteBuffer.wrap(SAMPLE_RECORD_BODY.getBytes());
+
+    @Mock
+    private AmazonKinesis kinesisClient;
+    @Mock
+    private KinesisEndpoint kinesisEndpoint;
+    @Mock
+    private Message outMessage;
+    @Mock
+    private Message inMessage;
+    @Mock
+    private PutRecordResult putRecordResult;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private Exchange exchange;
+
+    private KinesisProducer kinesisProducer;
+    @Before
+    public void setup() throws Exception {
+        when(kinesisEndpoint.getClient()).thenReturn(kinesisClient);
+        when(kinesisEndpoint.getEndpointUri()).thenReturn("kinesis://etl");
+        when(kinesisEndpoint.getStreamName()).thenReturn(STREAM_NAME);
+
+        when(exchange.getOut()).thenReturn(outMessage);
+        when(exchange.getIn()).thenReturn(inMessage);
+        when(exchange.getPattern()).thenReturn(ExchangePattern.InOut);
+
+        when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER);
+
+        when(putRecordResult.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER);
+        when(putRecordResult.getShardId()).thenReturn(SHARD_ID);
+
+        
when(kinesisClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult);
+
+        kinesisProducer = new KinesisProducer(kinesisEndpoint);
+    }
+
+    @Test
+    public void shouldPutRecordInRightStreamWhenProcessingExchange() throws 
Exception{
+        kinesisProducer.process(exchange);
+
+        ArgumentCaptor<PutRecordRequest> capture = 
ArgumentCaptor.forClass(PutRecordRequest.class);
+        verify(kinesisClient).putRecord(capture.capture());
+        PutRecordRequest request = capture.getValue();
+        ByteBuffer byteBuffer = request.getData();
+        byte[] actualArray = byteBuffer.array();
+        byte[] sampleArray = SAMPLE_BUFFER.array();
+        assertEquals(sampleArray, actualArray);
+        assertEquals(STREAM_NAME, request.getStreamName());
+    }
+
+    @Test
+    public void shouldHaveProperHeadersWhenSending() throws Exception {
+        String partitionKey = "partition";
+        String seqNoForOrdering = "1851";
+        
when(inMessage.getHeader(KinesisConstants.SEQUENCE_NUMBER)).thenReturn(seqNoForOrdering);
+        
when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(partitionKey);
+
+        kinesisProducer.process(exchange);
+
+        ArgumentCaptor<PutRecordRequest> capture = 
ArgumentCaptor.forClass(PutRecordRequest.class);
+        verify(kinesisClient).putRecord(capture.capture());
+        PutRecordRequest request = capture.getValue();
+
+        assertEquals(partitionKey, request.getPartitionKey());
+        assertEquals(seqNoForOrdering, request.getSequenceNumberForOrdering());
+        verify(outMessage).setHeader(KinesisConstants.SEQUENCE_NUMBER, 
SEQUENCE_NUMBER);
+        verify(outMessage).setHeader(KinesisConstants.SHARD_ID, SHARD_ID);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/b7ffeacd/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java
index 0181e8f..d48a814 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sdb/AbstractSdbCommandTest.java
@@ -55,15 +55,6 @@ public class AbstractSdbCommandTest {
     }
     
     @Test
-    public void getMessageForResponse() {
-        assertSame(exchange.getIn(), 
this.command.getMessageForResponse(exchange));
-        
-        exchange.setPattern(ExchangePattern.InOut);
-        
-        assertSame(exchange.getOut(), 
this.command.getMessageForResponse(exchange));
-    }
-    
-    @Test
     public void determineDomainName() {
         assertEquals("DOMAIN1", this.command.determineDomainName());
         

Reply via email to