Add an understanding of the shard list to the main dynamodbstream consumer.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ceaef105
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ceaef105
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ceaef105

Branch: refs/heads/master
Commit: ceaef105323d031ef92751d2744ad7a0f0be2c08
Parents: fdadd15
Author: Candle <can...@candle.me.uk>
Authored: Fri Dec 11 16:25:48 2015 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Dec 16 14:19:12 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/DdbStreamConsumer.java   | 15 ++++++++++++++-
 .../camel/component/aws/ddbstream/ShardList.java     | 11 +++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 88d2ba5..d4c9ac7 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -26,6 +26,7 @@ import 
com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.Shard;
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
@@ -42,6 +43,8 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(DdbStreamConsumer.class);
 
     private String currentShardIterator = null;
+    private Shard currentShard;
+    private ShardList shardList = new ShardList();
 
     public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -105,10 +108,20 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
                     .withStreamArn(streamArn)
                     ;
             DescribeStreamResult res1 = getClient().describeStream(req1);
+            shardList.addAll(res1.getStreamDescription().getShards());
+
+            LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
+            if (currentShard == null) {
+                currentShard = shardList.first();
+            } else {
+                currentShard = shardList.nextAfter(currentShard);
+            }
+            shardList.removeOlderThan(currentShard);
+            LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
 
             GetShardIteratorRequest req = new GetShardIteratorRequest()
                     .withStreamArn(streamArn)
-                    
.withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // 
XXX only uses the first shard
+                    .withShardId(currentShard.getShardId())
                     .withShardIteratorType(getEndpoint().getIteratorType())
                     ;
             GetShardIteratorResult result = getClient().getShardIterator(req);

http://git-wip-us.apache.org/repos/asf/camel/blob/ceaef105/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index fb188f4..a85e703 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -4,8 +4,11 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class ShardList {
+    private final Logger LOG = LoggerFactory.getLogger(ShardList.class);
 
     private final Map<String, Shard> shards = new HashMap<>();
 
@@ -45,13 +48,21 @@ class ShardList {
     void removeOlderThan(Shard removeBefore) {
         String current = removeBefore.getParentShardId();
 
+        int removedShards = 0;
         while (current != null) {
             Shard s = shards.remove(current);
             if (s == null) {
                 current = null;
             } else {
+                removedShards++;
                 current = s.getParentShardId();
             }
         }
+        LOG.trace("removed {} shards from the store, new size is {}", 
removedShards, shards.size());
+    }
+
+    @Override
+    public String toString() {
+        return "ShardList{" + "shards=" + shards + '}';
     }
 }
\ No newline at end of file

Reply via email to