Add an understanding of the shard list to the main dynamodbstream consumer.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ceaef105 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ceaef105 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ceaef105 Branch: refs/heads/master Commit: ceaef105323d031ef92751d2744ad7a0f0be2c08 Parents: fdadd15 Author: Candle <can...@candle.me.uk> Authored: Fri Dec 11 16:25:48 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 16 14:19:12 2015 +0100 ---------------------------------------------------------------------- .../component/aws/ddbstream/DdbStreamConsumer.java | 15 ++++++++++++++- .../camel/component/aws/ddbstream/ShardList.java | 11 +++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index 88d2ba5..d4c9ac7 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -26,6 +26,7 @@ import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; import com.amazonaws.services.dynamodbv2.model.Record; +import com.amazonaws.services.dynamodbv2.model.Shard; import java.util.ArrayDeque; import java.util.List; import java.util.Queue; @@ -42,6 +43,8 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class); private String currentShardIterator = null; + private Shard currentShard; + private ShardList shardList = new ShardList(); public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -105,10 +108,20 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { .withStreamArn(streamArn) ; DescribeStreamResult res1 = getClient().describeStream(req1); + shardList.addAll(res1.getStreamDescription().getShards()); + + LOG.trace("Current shard is: {} (in {})", currentShard, shardList); + if (currentShard == null) { + currentShard = shardList.first(); + } else { + currentShard = shardList.nextAfter(currentShard); + } + shardList.removeOlderThan(currentShard); + LOG.trace("Next shard is: {} (in {})", currentShard, shardList); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamArn(streamArn) - .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard + .withShardId(currentShard.getShardId()) .withShardIteratorType(getEndpoint().getIteratorType()) ; GetShardIteratorResult result = getClient().getShardIterator(req); http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java index fb188f4..a85e703 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java @@ -4,8 +4,11 @@ import com.amazonaws.services.dynamodbv2.model.Shard; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class ShardList { + private final Logger LOG = LoggerFactory.getLogger(ShardList.class); private final Map<String, Shard> shards = new HashMap<>(); @@ -45,13 +48,21 @@ class ShardList { void removeOlderThan(Shard removeBefore) { String current = removeBefore.getParentShardId(); + int removedShards = 0; while (current != null) { Shard s = shards.remove(current); if (s == null) { current = null; } else { + removedShards++; current = s.getParentShardId(); } } + LOG.trace("removed {} shards from the store, new size is {}", removedShards, shards.size()); + } + + @Override + public String toString() { + return "ShardList{" + "shards=" + shards + '}'; } } \ No newline at end of file