Repository: camel Updated Branches: refs/heads/master add6c68e6 -> aee808a68
AWS Kinesis Consumer support for Sequence Number and Shard Id Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87c48d9e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87c48d9e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87c48d9e Branch: refs/heads/master Commit: 87c48d9e1b7181b7a0ef2a7a612c9d6138ac1050 Parents: add6c68 Author: Frank Farrell <ofear...@gmail.com> Authored: Thu Apr 21 09:47:17 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 21 17:06:56 2016 +0200 ---------------------------------------------------------------------- .../camel-aws/src/main/docs/aws-kinesis.adoc | 6 ++- .../component/aws/kinesis/KinesisConsumer.java | 28 ++++++++++++-- .../component/aws/kinesis/KinesisEndpoint.java | 31 ++++++++++++++++ .../aws/kinesis/KinesisConsumerTest.java | 39 ++++++++++++++++++++ .../aws/kinesis/KinesisEndpointTest.java | 22 +++++++++++ 5 files changed, 121 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/main/docs/aws-kinesis.adoc ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc b/components/camel-aws/src/main/docs/aws-kinesis.adoc index 05f6468..acb0557 100644 --- a/components/camel-aws/src/main/docs/aws-kinesis.adoc +++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc @@ -38,7 +38,7 @@ The AWS Kinesis component has no options. // endpoint options: START -The AWS Kinesis component supports 23 endpoint options which are listed below: +The AWS Kinesis component supports 25 endpoint options which are listed below: [width="100%",cols="2s,1,1m,1m,5",options="header"] |======================================================================= @@ -46,7 +46,9 @@ The AWS Kinesis component supports 23 endpoint options which are listed below: | streamName | common | | String | *Required* Name of the stream | amazonKinesisClient | common | | AmazonKinesis | *Required* Amazon Kinesis client to use for all requests for this endpoint | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. -| iteratorType | consumer | | ShardIteratorType | Defines where in the Kinesis stream to start getting records +| iteratorType | consumer | trim_horizon | ShardIteratorType |One of trim_horizon, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER or latest. See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html[http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html] for descriptions of these four iterator types. If iteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER you must specify the sequenceNumber. +| shardId | consumer | 0 | String | Defines which shardId in the Kinesis stream to get records from. +| sequenceNumber | consumer || String | The sequence number to start polling from. This property is only valid if iteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. | maxResultsPerRequest | consumer | 1 | int | Maximum number of records that will be fetched in each poll | sendEmptyMessageWhenIdle | consumer | false | boolean | If the polling consumer did not poll any files you can enable this option to send an empty message (no body) instead. | exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java index def7e63..bb27cf9 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java @@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -96,14 +97,29 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer { private String getShardItertor() { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { - DescribeStreamRequest req1 = new DescribeStreamRequest() + String shardId; + + //If ShardId supplied use it, else choose first one + if(!getEndpoint().getShardId().isEmpty()){ + shardId = getEndpoint().getShardId(); + } + else{ + DescribeStreamRequest req1 = new DescribeStreamRequest() .withStreamName(getEndpoint().getStreamName()); - DescribeStreamResult res1 = getClient().describeStream(req1); + DescribeStreamResult res1 = getClient().describeStream(req1); + shardId = res1.getStreamDescription().getShards().get(0).getShardId(); + } + LOG.debug("ShardId is: {}", shardId); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamName(getEndpoint().getStreamName()) - .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard + .withShardId(shardId) .withShardIteratorType(getEndpoint().getIteratorType()); + + if(hasSequenceNumber()){ + req.withStartingSequenceNumber(getEndpoint().getSequenceNumber()); + } + GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } @@ -118,4 +134,10 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer { } return exchanges; } + + private boolean hasSequenceNumber(){ + return !getEndpoint().getSequenceNumber().isEmpty() && + (getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) + || getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java index 4e79926..014c3f9 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -46,12 +46,27 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @UriParam(label = "consumer", description = "Defines where in the Kinesis stream to start getting records") private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; + @UriParam(label = "consumer", description = "Defines which shardId in the Kinesis stream to get records from") + private String shardId =""; + + @UriParam(label = "consumer", description = "The sequence number to start polling from") + private String sequenceNumber=""; + + public KinesisEndpoint(String uri, String streamName, KinesisComponent component) { super(uri, component); this.streamName = streamName; } @Override + protected void doStart() throws Exception { + if((iteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && sequenceNumber.isEmpty()){ + throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); + } + super.doStart(); + } + + @Override public Producer createProducer() throws Exception { return new KinesisProducer(this); } @@ -115,4 +130,20 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { this.iteratorType = iteratorType; } + public String getShardId() { + return shardId; + } + + public void setShardId(String shardId) { + this.shardId = shardId; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java index 8478f26..5376850 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; @RunWith(MockitoJUnitRunner.class) public class KinesisConsumerTest { @@ -101,6 +102,44 @@ public class KinesisConsumerTest { } @Test + public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception { + undertest.getEndpoint().setShardId("shardIdPassedAsUrlParam"); + + undertest.poll(); + + verify(kinesisClient, never()).describeStream(any(DescribeStreamRequest.class)); + + final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + + verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); + assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); + assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardIdPassedAsUrlParam")); + assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST")); + } + + @Test + public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception { + undertest.getEndpoint().setSequenceNumber("12345"); + undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + + undertest.poll(); + + final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); + final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + + verify(kinesisClient).describeStream(describeStreamReqCap.capture()); + assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); + + verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); + assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); + assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); + assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER")); + assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345")); + + } + + + @Test public void itUsesTheShardIteratorOnPolls() throws Exception { undertest.poll(); http://git-wip-us.apache.org/repos/asf/camel/blob/87c48d9e/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java index a8f87c2..c6910bc 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java @@ -19,6 +19,7 @@ package org.apache.camel.component.aws.kinesis; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ShardIteratorType; import org.apache.camel.CamelContext; +import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.SimpleRegistry; import org.junit.Before; @@ -51,12 +52,16 @@ public class KinesisEndpointTest { + "?amazonKinesisClient=#kinesisClient" + "&maxResultsPerRequest=101" + "&iteratorType=latest" + + "&shardId=abc" + + "&sequenceNumber=123" ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST)); assertThat(endpoint.getMaxResultsPerRequest(), is(101)); + assertThat(endpoint.getSequenceNumber(), is("123")); + assertThat(endpoint.getShardId(), is("abc")); } @Test @@ -70,4 +75,21 @@ public class KinesisEndpointTest { assertThat(endpoint.getIteratorType(), is(ShardIteratorType.TRIM_HORIZON)); assertThat(endpoint.getMaxResultsPerRequest(), is(1)); } + + @Test(expected = ResolveEndpointFailedException.class) + public void afterSequenceNumberRequiresSequenceNumber() throws Exception { + KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + + "?amazonKinesisClient=#kinesisClient" + + "&iteratorType=AFTER_SEQUENCE_NUMBER" + ); + + } + + @Test(expected = ResolveEndpointFailedException.class) + public void atSequenceNumberRequiresSequenceNumber() throws Exception { + KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + + "?amazonKinesisClient=#kinesisClient" + + "&iteratorType=AT_SEQUENCE_NUMBER" + ); + } } \ No newline at end of file