Repository: camel
Updated Branches:
  refs/heads/master add6c68e6 -> aee808a68


AWS Kinesis Consumer support for Sequence Number and Shard Id


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87c48d9e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87c48d9e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87c48d9e

Branch: refs/heads/master
Commit: 87c48d9e1b7181b7a0ef2a7a612c9d6138ac1050
Parents: add6c68
Author: Frank Farrell <ofear...@gmail.com>
Authored: Thu Apr 21 09:47:17 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Apr 21 17:06:56 2016 +0200

----------------------------------------------------------------------
 .../camel-aws/src/main/docs/aws-kinesis.adoc    |  6 ++-
 .../component/aws/kinesis/KinesisConsumer.java  | 28 ++++++++++++--
 .../component/aws/kinesis/KinesisEndpoint.java  | 31 ++++++++++++++++
 .../aws/kinesis/KinesisConsumerTest.java        | 39 ++++++++++++++++++++
 .../aws/kinesis/KinesisEndpointTest.java        | 22 +++++++++++
 5 files changed, 121 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/main/docs/aws-kinesis.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc 
b/components/camel-aws/src/main/docs/aws-kinesis.adoc
index 05f6468..acb0557 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc
@@ -38,7 +38,7 @@ The AWS Kinesis component has no options.
 
 
 // endpoint options: START
-The AWS Kinesis component supports 23 endpoint options which are listed below:
+The AWS Kinesis component supports 25 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -46,7 +46,9 @@ The AWS Kinesis component supports 23 endpoint options which 
are listed below:
 | streamName | common |  | String | *Required* Name of the stream
 | amazonKinesisClient | common |  | AmazonKinesis | *Required* Amazon Kinesis 
client to use for all requests for this endpoint
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the 
consumer to the Camel routing Error Handler which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages or the likes will now 
be processed as a message and handled by the routing Error Handler. By default 
the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN/ERROR level and ignored.
-| iteratorType | consumer |  | ShardIteratorType | Defines where in the 
Kinesis stream to start getting records
+| iteratorType | consumer | trim_horizon | ShardIteratorType |One of 
trim_horizon, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER or latest. See 
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html[http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html]
 for descriptions of these four iterator types. If iteratorType is 
AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER you must specify the sequenceNumber.
+| shardId | consumer | 0 | String | Defines which shardId in the Kinesis 
stream to get records from.
+| sequenceNumber | consumer || String | The sequence number to start polling 
from. This property is only valid if iteratorType is AT_SEQUENCE_NUMBER or 
AFTER_SEQUENCE_NUMBER.
 | maxResultsPerRequest | consumer | 1 | int | Maximum number of records that 
will be fetched in each poll
 | sendEmptyMessageWhenIdle | consumer | false | boolean | If the polling 
consumer did not poll any files you can enable this option to send an empty 
message (no body) instead.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the 
consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler 
is enabled then this options is not in use. By default the consumer will deal 
with exceptions that will be logged at WARN/ERROR level and ignored.

http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index def7e63..bb27cf9 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -96,14 +97,29 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
     private String getShardItertor() {
         // either return a cached one or get a new one via a GetShardIterator 
request.
         if (currentShardIterator == null) {
-            DescribeStreamRequest req1 = new DescribeStreamRequest()
+            String shardId;
+
+            //If ShardId supplied use it, else choose first one
+            if(!getEndpoint().getShardId().isEmpty()){
+                shardId = getEndpoint().getShardId();
+            }
+            else{
+                DescribeStreamRequest req1 = new DescribeStreamRequest()
                     .withStreamName(getEndpoint().getStreamName());
-            DescribeStreamResult res1 = getClient().describeStream(req1);
+                DescribeStreamResult res1 = getClient().describeStream(req1);
+                shardId = 
res1.getStreamDescription().getShards().get(0).getShardId();
+            }
+            LOG.debug("ShardId is: {}", shardId);
 
             GetShardIteratorRequest req = new GetShardIteratorRequest()
                     .withStreamName(getEndpoint().getStreamName())
-                    
.withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // 
XXX only uses the first shard
+                    .withShardId(shardId)
                     .withShardIteratorType(getEndpoint().getIteratorType());
+
+            if(hasSequenceNumber()){
+                
req.withStartingSequenceNumber(getEndpoint().getSequenceNumber());
+            }
+
             GetShardIteratorResult result = getClient().getShardIterator(req);
             currentShardIterator = result.getShardIterator();
         }
@@ -118,4 +134,10 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
         }
         return exchanges;
     }
+
+    private boolean hasSequenceNumber(){
+        return !getEndpoint().getSequenceNumber().isEmpty() &&
+                
(getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+                        || 
getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index 4e79926..014c3f9 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -46,12 +46,27 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
     @UriParam(label = "consumer", description = "Defines where in the Kinesis 
stream to start getting records")
     private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
 
+    @UriParam(label = "consumer", description = "Defines which shardId in the 
Kinesis stream to get records from")
+    private String shardId ="";
+
+    @UriParam(label = "consumer", description = "The sequence number to start 
polling from")
+    private String sequenceNumber="";
+
+
     public KinesisEndpoint(String uri, String streamName, KinesisComponent 
component) {
         super(uri, component);
         this.streamName = streamName;
     }
 
     @Override
+    protected void doStart() throws Exception {
+        if((iteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || 
iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && 
sequenceNumber.isEmpty()){
+            throw new IllegalArgumentException("Sequence Number must be 
specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
+        }
+        super.doStart();
+    }
+
+    @Override
     public Producer createProducer() throws Exception {
         return new KinesisProducer(this);
     }
@@ -115,4 +130,20 @@ public class KinesisEndpoint extends ScheduledPollEndpoint 
{
         this.iteratorType = iteratorType;
     }
 
+    public String getShardId() {
+        return shardId;
+    }
+
+    public void setShardId(String shardId) {
+        this.shardId = shardId;
+    }
+
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public void setSequenceNumber(String sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index 8478f26..5376850 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -47,6 +47,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
 
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisConsumerTest {
@@ -101,6 +102,44 @@ public class KinesisConsumerTest {
     }
 
     @Test
+    public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws 
Exception {
+        undertest.getEndpoint().setShardId("shardIdPassedAsUrlParam");
+
+        undertest.poll();
+
+        verify(kinesisClient, 
never()).describeStream(any(DescribeStreamRequest.class));
+
+        final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+
+        
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
+        assertThat(getShardIteratorReqCap.getValue().getStreamName(), 
is("streamName"));
+        assertThat(getShardIteratorReqCap.getValue().getShardId(), 
is("shardIdPassedAsUrlParam"));
+        assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), 
is("LATEST"));
+    }
+
+    @Test
+    public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws 
Exception {
+        undertest.getEndpoint().setSequenceNumber("12345");
+        
undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+
+        undertest.poll();
+
+        final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = 
ArgumentCaptor.forClass(DescribeStreamRequest.class);
+        final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+
+        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
+        assertThat(describeStreamReqCap.getValue().getStreamName(), 
is("streamName"));
+
+        
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
+        assertThat(getShardIteratorReqCap.getValue().getStreamName(), 
is("streamName"));
+        assertThat(getShardIteratorReqCap.getValue().getShardId(), 
is("shardId"));
+        assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), 
is("AFTER_SEQUENCE_NUMBER"));
+        
assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), 
is("12345"));
+
+    }
+
+
+    @Test
     public void itUsesTheShardIteratorOnPolls() throws Exception {
         undertest.poll();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
index a8f87c2..c6910bc 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws.kinesis;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.camel.CamelContext;
+import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 import org.junit.Before;
@@ -51,12 +52,16 @@ public class KinesisEndpointTest {
                 + "?amazonKinesisClient=#kinesisClient"
                 + "&maxResultsPerRequest=101"
                 + "&iteratorType=latest"
+                + "&shardId=abc"
+                + "&sequenceNumber=123"
         );
 
         assertThat(endpoint.getClient(), is(amazonKinesisClient));
         assertThat(endpoint.getStreamName(), is("some_stream_name"));
         assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST));
         assertThat(endpoint.getMaxResultsPerRequest(), is(101));
+        assertThat(endpoint.getSequenceNumber(), is("123"));
+        assertThat(endpoint.getShardId(), is("abc"));
     }
 
     @Test
@@ -70,4 +75,21 @@ public class KinesisEndpointTest {
         assertThat(endpoint.getIteratorType(), 
is(ShardIteratorType.TRIM_HORIZON));
         assertThat(endpoint.getMaxResultsPerRequest(), is(1));
     }
+
+    @Test(expected = ResolveEndpointFailedException.class)
+    public void afterSequenceNumberRequiresSequenceNumber() throws Exception {
+        KinesisEndpoint endpoint = (KinesisEndpoint) 
camelContext.getEndpoint("aws-kinesis://some_stream_name"
+                + "?amazonKinesisClient=#kinesisClient"
+                + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+        );
+
+    }
+
+    @Test(expected = ResolveEndpointFailedException.class)
+    public void atSequenceNumberRequiresSequenceNumber() throws Exception {
+        KinesisEndpoint endpoint = (KinesisEndpoint) 
camelContext.getEndpoint("aws-kinesis://some_stream_name"
+                + "?amazonKinesisClient=#kinesisClient"
+                + "&iteratorType=AT_SEQUENCE_NUMBER"
+        );
+    }
 }
\ No newline at end of file

Reply via email to