Correctly support the LATEST shard iterator type by starting with the last 
shard in the stream descrption.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3b86b97
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3b86b97
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3b86b97

Branch: refs/heads/master
Commit: e3b86b977bbcda16d35c936dab77ed7b07f90e6e
Parents: 7180893
Author: Candle <[email protected]>
Authored: Wed Dec 16 10:34:21 2015 +0000
Committer: Claus Ibsen <[email protected]>
Committed: Wed Dec 16 14:19:13 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/DdbStreamConsumer.java  | 10 +++++++++-
 .../component/aws/ddbstream/DdbStreamEndpoint.java  | 16 +++++++++++++++-
 .../camel/component/aws/ddbstream/ShardList.java    | 13 +++++++++++++
 .../component/aws/ddbstream/ShardListTest.java      |  8 ++++++++
 4 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 0a6a83c..f5223c0 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -107,7 +107,15 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
 
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
-                currentShard = shardList.first();
+                switch(getEndpoint().getIteratorType()) {
+                case TRIM_HORIZON:
+                    currentShard = shardList.first();
+                    break;
+                default:
+                case LATEST:
+                    currentShard = shardList.last();
+                    break;
+                }
             } else {
                 currentShard = shardList.nextAfter(currentShard);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 66a7461..543c432 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -29,7 +29,9 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
-@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", 
consumerOnly = true, syntax = "aws-ddbstream:tableName", consumerClass = 
DdbStreamConsumer.class, label = "cloud,messaging,streams")
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams",
+        consumerOnly = true, syntax = "aws-ddbstream:tableName",
+        consumerClass = DdbStreamConsumer.class, label = 
"cloud,messaging,streams")
 public class DdbStreamEndpoint extends ScheduledPollEndpoint {
 
     @UriPath(label = "consumer", description = "Name of the dynamodb table")
@@ -56,6 +58,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     // This can be done by having the type of the parameter an interface
     // and supplying a default implementation and a converter from a 
long/String
     // to an instance of this interface.
+    // Note that the shard list needs to have the ability to start at the shard
+    // that includes the supplied sequence number
 
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent 
component) {
         super(uri, component);
@@ -86,6 +90,16 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint 
{
         return true;
     }
 
+    @Override
+    public String toString() {
+        return "DdbStreamEndpoint{"
+                + "tableName=" + tableName
+                + ", amazonDynamoDbStreamsClient=[redacted], 
maxResultsPerRequest=" + maxResultsPerRequest
+                + ", iteratorType="
+                + iteratorType + ", uri=" + getEndpointUri()
+                + '}';
+    }
+
     AmazonDynamoDBStreams getClient() {
         return amazonDynamoDbStreamsClient;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 6e804f5..a0df179 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -57,6 +57,19 @@ class ShardList {
         throw new IllegalStateException("Unable to find an unparented shard in 
" + shards);
     }
 
+    Shard last() {
+        Map<String, Shard> shardsByParent = new HashMap<>();
+        for (Shard shard : shards.values()) {
+            shardsByParent.put(shard.getParentShardId(), shard);
+        }
+        for (Shard shard : shards.values()) {
+            if (!shardsByParent.containsKey(shard.getShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find a shard with no 
children " + shards);
+    }
+
     /**
      * Removes shards that are older than the provided shard.
      * Does not remove the provided shard.

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 60f3d46..1b7249a 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -102,6 +102,14 @@ public class ShardListTest {
     }
 
     @Test
+    public void lastShardGetsTheShardWithNoChildren() throws Exception {
+        ShardList shards = new ShardList();
+        shards.addAll(createShards("a", "b", "c", "d"));
+
+        assertThat(shards.last().getShardId(), is("d"));
+    }
+
+    @Test
     public void removingShards() throws Exception {
         ShardList shards = new ShardList();
         shards.addAll(createShards(null, "a", "b", "c", "d"));

Reply via email to