Correctly support the LATEST shard iterator type by starting with the last shard in the stream descrption.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3b86b97 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3b86b97 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3b86b97 Branch: refs/heads/master Commit: e3b86b977bbcda16d35c936dab77ed7b07f90e6e Parents: 7180893 Author: Candle <[email protected]> Authored: Wed Dec 16 10:34:21 2015 +0000 Committer: Claus Ibsen <[email protected]> Committed: Wed Dec 16 14:19:13 2015 +0100 ---------------------------------------------------------------------- .../component/aws/ddbstream/DdbStreamConsumer.java | 10 +++++++++- .../component/aws/ddbstream/DdbStreamEndpoint.java | 16 +++++++++++++++- .../camel/component/aws/ddbstream/ShardList.java | 13 +++++++++++++ .../component/aws/ddbstream/ShardListTest.java | 8 ++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index 0a6a83c..f5223c0 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -107,7 +107,15 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { LOG.trace("Current shard is: {} (in {})", currentShard, shardList); if (currentShard == null) { - currentShard = shardList.first(); + switch(getEndpoint().getIteratorType()) { + case TRIM_HORIZON: + currentShard = shardList.first(); + break; + default: + case LATEST: + currentShard = shardList.last(); + break; + } } else { currentShard = shardList.nextAfter(currentShard); } http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java index 66a7461..543c432 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java @@ -29,7 +29,9 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; -@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", consumerOnly = true, syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams") +@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", + consumerOnly = true, syntax = "aws-ddbstream:tableName", + consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams") public class DdbStreamEndpoint extends ScheduledPollEndpoint { @UriPath(label = "consumer", description = "Name of the dynamodb table") @@ -56,6 +58,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint { // This can be done by having the type of the parameter an interface // and supplying a default implementation and a converter from a long/String // to an instance of this interface. + // Note that the shard list needs to have the ability to start at the shard + // that includes the supplied sequence number public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) { super(uri, component); @@ -86,6 +90,16 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint { return true; } + @Override + public String toString() { + return "DdbStreamEndpoint{" + + "tableName=" + tableName + + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + + ", iteratorType=" + + iteratorType + ", uri=" + getEndpointUri() + + '}'; + } + AmazonDynamoDBStreams getClient() { return amazonDynamoDbStreamsClient; } http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java index 6e804f5..a0df179 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java @@ -57,6 +57,19 @@ class ShardList { throw new IllegalStateException("Unable to find an unparented shard in " + shards); } + Shard last() { + Map<String, Shard> shardsByParent = new HashMap<>(); + for (Shard shard : shards.values()) { + shardsByParent.put(shard.getParentShardId(), shard); + } + for (Shard shard : shards.values()) { + if (!shardsByParent.containsKey(shard.getShardId())) { + return shard; + } + } + throw new IllegalStateException("Unable to find a shard with no children " + shards); + } + /** * Removes shards that are older than the provided shard. * Does not remove the provided shard. http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java index 60f3d46..1b7249a 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java @@ -102,6 +102,14 @@ public class ShardListTest { } @Test + public void lastShardGetsTheShardWithNoChildren() throws Exception { + ShardList shards = new ShardList(); + shards.addAll(createShards("a", "b", "c", "d")); + + assertThat(shards.last().getShardId(), is("d")); + } + + @Test public void removingShards() throws Exception { ShardList shards = new ShardList(); shards.addAll(createShards(null, "a", "b", "c", "d"));
