Repository: camel
Updated Branches:
  refs/heads/master 19e63dbaf -> e78a02960


Added an endpoint for consuming Amazon Kinesis Streams.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72370df1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72370df1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72370df1

Branch: refs/heads/master
Commit: 72370df11c9768b697a930e22c417f1af95dd919
Parents: 19e63db
Author: Candle <can...@candle.me.uk>
Authored: Wed Dec 2 16:12:48 2015 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Dec 5 10:06:17 2015 +0100

----------------------------------------------------------------------
 .../component/aws/kinesis/KinesisComponent.java |  44 +++++
 .../component/aws/kinesis/KinesisConstants.java |  29 ++++
 .../component/aws/kinesis/KinesisConsumer.java  | 126 +++++++++++++++
 .../component/aws/kinesis/KinesisEndpoint.java  | 123 ++++++++++++++
 .../aws/kinesis/RecordStringConverter.java      |  43 +++++
 .../services/org/apache/camel/TypeConverter     |  18 +++
 .../org/apache/camel/component/aws-kinesis      |  18 +++
 .../aws/kinesis/KinesisConsumerTest.java        | 162 +++++++++++++++++++
 .../aws/kinesis/KinesisEndpointTest.java        |  71 ++++++++
 .../aws/kinesis/RecordStringConverterTest.java  |  38 +++++
 10 files changed, 672 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
new file mode 100644
index 0000000..d3f34ab
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import java.util.Map;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KinesisComponent extends UriEndpointComponent {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisComponent.class);
+
+    public KinesisComponent() {
+        super(KinesisEndpoint.class);
+    }
+
+    public KinesisComponent(CamelContext context) {
+        super(context, KinesisEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        KinesisEndpoint endpoint = new KinesisEndpoint(uri, remaining, this);
+
+        LOG.debug("Created endpoint: {}", endpoint.toString());
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
new file mode 100644
index 0000000..b028123
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+public interface KinesisConstants {
+
+    public static final String SEQUENCE_NUMBER = "KinesisSequenceNumber";
+    public static final String APPROX_ARRIVAL_TIME = 
"KinesisApproximateArrivalTimestamp";
+    public static final String PARTITION_KEY = "KinesisPartitionKey";
+    /**
+     * in a Kinesis Record object, the shard ID is obtained from the 
getPartitionKey method.
+     */
+    public static final String SHARD_ID = "KinesisPartitionKey";
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
new file mode 100644
index 0000000..b301f38
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Record;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KinesisConsumer extends ScheduledBatchPollingConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisConsumer.class);
+
+    private String currentShardIterator = null;
+
+    public KinesisConsumer(KinesisEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    /*
+     * Returns the number of messages polled.
+     */
+    @Override
+    protected int poll() throws Exception {
+        GetRecordsRequest req = new GetRecordsRequest()
+                .withShardIterator(getShardItertor())
+                .withLimit(getEndpoint().getMaxResultsPerRequest())
+                ;
+        GetRecordsResult result = getClient().getRecords(req);
+
+        Queue<Exchange> exchanges = createExchanges(result.getRecords());
+        int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+
+        // May cache the last successful sequence number, and pass it to the
+        // getRecords request. That way, on the next poll, we start from where
+        // we left off, however, I don't know what happens to subsiquent
+        // exchanges when an earlier echange fails.
+
+        currentShardIterator = result.getNextShardIterator();
+
+        return processedExchangeCount;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int processedExchanges = 0;
+        while (!exchanges.isEmpty()) {
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
+
+            LOG.trace("Processing exchange [{}] started.", exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
+            processedExchanges++;
+        }
+        return processedExchanges;
+    }
+
+    private AmazonKinesis getClient() {
+        return getEndpoint().getClient();
+    }
+
+    @Override
+    public KinesisEndpoint getEndpoint() {
+        return (KinesisEndpoint) super.getEndpoint();
+    }
+
+    private String getShardItertor() {
+        // either return a cached one or get a new one via a GetShardIterator 
request.
+        if (currentShardIterator == null) {
+            DescribeStreamRequest req1 = new DescribeStreamRequest()
+                    .withStreamName(getEndpoint().getStreamName())
+                    ;
+            DescribeStreamResult res1 = getClient().describeStream(req1);
+
+            GetShardIteratorRequest req = new GetShardIteratorRequest()
+                    .withStreamName(getEndpoint().getStreamName())
+                    
.withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // 
XXX only uses the first shard
+                    .withShardIteratorType(getEndpoint().getIteratorType())
+                    ;
+            GetShardIteratorResult result = getClient().getShardIterator(req);
+            currentShardIterator = result.getShardIterator();
+        }
+        LOG.debug("Shard Iterator is: {}", currentShardIterator);
+        return currentShardIterator;
+    }
+
+    private Queue<Exchange> createExchanges(List<Record> records) {
+        Queue<Exchange> exchanges = new ArrayDeque<>();
+        for (Record record : records) {
+            exchanges.add(getEndpoint().createExchange(record));
+        }
+        return exchanges;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
new file mode 100644
index 0000000..b4c7597
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = 
"aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = 
"cloud,messaging")
+public class KinesisEndpoint extends ScheduledPollEndpoint {
+
+    @UriPath(label = "consumer,producer", description = "Name of the stream")
+    @Metadata(required = "true")
+    private String streamName;
+
+    // For now, always assume that we've been supplied a client in the Camel 
registry.
+    @UriParam(label = "consumer", description = "Amazon Kinesis client to use 
for all requests for this endpoint")
+    @Metadata(required = "true")
+    private AmazonKinesis amazonKinesisClient;
+
+    @UriParam(label = "consumer", description = "Maximum number of records 
that will be fetched in each poll")
+    private int maxResultsPerRequest = 1;
+
+    @UriParam(label = "consumer", description = "Defines where in the Kinesis 
stream to start getting records")
+    private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+
+    public KinesisEndpoint(String uri, String streamName, KinesisComponent 
component) {
+        super(uri, component);
+        this.streamName = streamName;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new KinesisConsumer(this, processor);
+    }
+
+    Exchange createExchange(Record record) {
+        Exchange ex = super.createExchange();
+        ex.getIn().setBody(record, Record.class);
+        ex.setProperty(KinesisConstants.APPROX_ARRIVAL_TIME, 
record.getApproximateArrivalTimestamp());
+        ex.setProperty(KinesisConstants.PARTITION_KEY, 
record.getPartitionKey());
+        ex.setProperty(KinesisConstants.SEQUENCE_NUMBER, 
record.getSequenceNumber());
+
+        return ex;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        // probably right.
+        return true;
+    }
+
+    AmazonKinesis getClient() {
+        return amazonKinesisClient;
+    }
+
+    // required for injection.
+    public AmazonKinesis getAmazonKinesisClient() {
+        return amazonKinesisClient;
+    }
+
+    public void setAmazonKinesisClient(AmazonKinesis amazonKinesisClient) {
+        this.amazonKinesisClient = amazonKinesisClient;
+    }
+
+    public int getMaxResultsPerRequest() {
+        return maxResultsPerRequest;
+    }
+
+    public void setMaxResultsPerRequest(int maxResultsPerRequest) {
+        this.maxResultsPerRequest = maxResultsPerRequest;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public void setStreamName(String streamName) {
+        this.streamName = streamName;
+    }
+
+    public ShardIteratorType getIteratorType() {
+        return iteratorType;
+    }
+
+    public void setIteratorType(ShardIteratorType iteratorType) {
+        this.iteratorType = iteratorType;
+    }
+
+    @Override
+    public String toString() {
+        return "KinesisEndpoint{" + "amazonKinesisClient=[redacted], 
maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + 
iteratorType + ", streamName=" + streamName + '}';
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
new file mode 100644
index 0000000..bda8983
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.camel.Converter;
+
+@Converter
+public class RecordStringConverter {
+
+    @Converter
+    public static String toString(Record record) {
+        List<Byte> bytes = new ArrayList<>();
+        ByteBuffer buf = record.getData().asReadOnlyBuffer();
+        while (buf.hasRemaining()) {
+            bytes.add(buf.get());
+        }
+        byte[] a = new byte[bytes.size()];
+        for (int i = 0; i < bytes.size(); ++i) {
+            a[i] = bytes.get(i);
+        }
+        return new String(a, Charset.forName("UTF-8"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
 
b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
new file mode 100644
index 0000000..28707d6
--- /dev/null
+++ 
b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+org.apache.camel.component.aws.kinesis

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis
 
b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis
new file mode 100644
index 0000000..2329cbf
--- /dev/null
+++ 
b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.kinesis.KinesisComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
new file mode 100644
index 0000000..db0df68
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import java.util.Date;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import static org.hamcrest.CoreMatchers.is;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import static org.mockito.Mockito.*;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisConsumerTest {
+
+    @Mock private AmazonKinesis kinesisClient;
+    @Mock private AsyncProcessor processor;
+
+    private final CamelContext context = new DefaultCamelContext();
+    private final KinesisComponent component = new KinesisComponent(context);
+
+    private KinesisConsumer undertest;
+
+    @Before
+    public void setup() throws Exception {
+        KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", 
component);
+        endpoint.setAmazonKinesisClient(kinesisClient);
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+        undertest = new KinesisConsumer(endpoint, processor);
+
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
+                .thenReturn(new GetRecordsResult()
+                        .withNextShardIterator("nextShardIterator")
+                );
+        when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
+                .thenReturn(new DescribeStreamResult()
+                        .withStreamDescription(new StreamDescription()
+                                .withShards(new Shard().withShardId("shardId"))
+                        )
+                );
+        
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
+                .thenReturn(new GetShardIteratorResult()
+                        .withShardIterator("shardIterator")
+                );
+    }
+
+    @Test
+    public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
+        undertest.poll();
+
+        final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = 
ArgumentCaptor.forClass(DescribeStreamRequest.class);
+        final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+
+        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
+        assertThat(describeStreamReqCap.getValue().getStreamName(), 
is("streamName"));
+
+        
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
+        assertThat(getShardIteratorReqCap.getValue().getStreamName(), 
is("streamName"));
+        assertThat(getShardIteratorReqCap.getValue().getShardId(), 
is("shardId"));
+        assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), 
is("LATEST"));
+    }
+
+    @Test
+    public void itUsesTheShardIteratorOnPolls() throws Exception {
+        undertest.poll();
+
+        final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = 
ArgumentCaptor.forClass(GetRecordsRequest.class);
+        verify(kinesisClient).getRecords(getRecordsReqCap.capture());
+
+        assertThat(getRecordsReqCap.getValue().getShardIterator(), 
is("shardIterator"));
+    }
+
+
+    @Test
+    public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
+        undertest.poll();
+        undertest.poll();
+
+        final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = 
ArgumentCaptor.forClass(GetRecordsRequest.class);
+
+        verify(kinesisClient, 
times(1)).describeStream(any(DescribeStreamRequest.class));
+        verify(kinesisClient, 
times(1)).getShardIterator(any(GetShardIteratorRequest.class));
+        verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
+        assertThat(getRecordsReqCap.getAllValues().get(0).getShardIterator(), 
is("shardIterator"));
+        assertThat(getRecordsReqCap.getAllValues().get(1).getShardIterator(), 
is("nextShardIterator"));
+    }
+
+    @Test
+    public void recordsAreSentToTheProcessor() throws Exception {
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
+                .thenReturn(new GetRecordsResult()
+                        .withNextShardIterator("nextShardIterator")
+                        .withRecords(new Record().withSequenceNumber("1"), new 
Record().withSequenceNumber("2"))
+                );
+
+        int messageCount = undertest.poll();
+
+        assertThat(messageCount, is(2));
+        final ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
+
+        verify(processor, times(2)).process(exchangeCaptor.capture(), 
any(AsyncCallback.class));
+        
assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(),
 is("1"));
+        
assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(),
 is("2"));
+    }
+
+    @Test
+    public void exchangePropertiesAreSet() throws Exception {
+
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
+                .thenReturn(new GetRecordsResult()
+                        .withNextShardIterator("nextShardIterator")
+                        .withRecords(new Record()
+                                .withSequenceNumber("1")
+                                .withApproximateArrivalTimestamp(new Date(42))
+                                .withPartitionKey("shardId")
+                        )
+                );
+
+        undertest.poll();
+
+        final ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
+
+        verify(processor).process(exchangeCaptor.capture(), 
any(AsyncCallback.class));
+        
assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.APPROX_ARRIVAL_TIME,
 long.class), is(42L));
+        
assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.PARTITION_KEY,
 String.class), is("shardId"));
+        
assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SEQUENCE_NUMBER,
 String.class), is("1"));
+        
assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SHARD_ID, 
String.class), is("shardId"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
new file mode 100644
index 0000000..50653e3
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisEndpointTest {
+
+    @Mock private AmazonKinesis amazonKinesisClient;
+
+    private CamelContext camelContext;
+
+    @Before
+    public void setup() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("kinesisClient", amazonKinesisClient);
+        camelContext = new DefaultCamelContext(registry);
+    }
+
+    @Test
+    public void allTheEndpointParams() throws Exception {
+        KinesisEndpoint endpoint = 
(KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name"
+                + "?amazonKinesisClient=#kinesisClient"
+                + "&maxResultsPerRequest=101"
+                + "&iteratorType=latest"
+                );
+
+        assertThat(endpoint.getClient(), is(amazonKinesisClient));
+        assertThat(endpoint.getStreamName(), is("some_stream_name"));
+        assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST));
+        assertThat(endpoint.getMaxResultsPerRequest(), is(101));
+    }
+
+    @Test
+    public void onlyRequiredEndpointParams() throws Exception {
+        KinesisEndpoint endpoint = 
(KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name"
+                + "?amazonKinesisClient=#kinesisClient"
+                );
+
+        assertThat(endpoint.getClient(), is(amazonKinesisClient));
+        assertThat(endpoint.getStreamName(), is("some_stream_name"));
+        assertThat(endpoint.getIteratorType(), 
is(ShardIteratorType.TRIM_HORIZON));
+        assertThat(endpoint.getMaxResultsPerRequest(), is(1));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
new file mode 100644
index 0000000..48f8edb
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import static org.hamcrest.CoreMatchers.is;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class RecordStringConverterTest {
+
+    @Test
+    public void convertRecordToString() throws Exception {
+        Record record = new Record()
+                .withSequenceNumber("1")
+                .withData(ByteBuffer.wrap("this is a 
String".getBytes(Charset.forName("UTF-8"))))
+                ;
+
+        String result = RecordStringConverter.toString(record);
+        assertThat(result, is("this is a String"));
+    }
+}
\ No newline at end of file

Reply via email to