Repository: camel
Updated Branches:
  refs/heads/master e23bf831a -> 2f1c8599c


CAMEL-9515 refactor to extract the shard iterator discovery code, as it's going 
to get more complicated for this ticket.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d097871c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d097871c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d097871c

Branch: refs/heads/master
Commit: d097871c664f17fad96c19eb014fd6bd828d1357
Parents: 81c5a19
Author: Candle <can...@candle.me.uk>
Authored: Mon Jan 18 09:48:11 2016 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Jan 21 10:10:24 2016 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumer.java        |  86 +--------------
 .../aws/ddbstream/ShardIteratorHandler.java     | 110 +++++++++++++++++++
 2 files changed, 114 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d097871c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index aad6550..4bdb5c6 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -22,17 +22,9 @@ import java.util.List;
 import java.util.Queue;
 
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
-import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
-import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
 import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
 import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
-import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
-import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
-import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
-import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
-import com.amazonaws.services.dynamodbv2.model.Shard;
-import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -45,25 +37,24 @@ import org.slf4j.LoggerFactory;
 public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(DdbStreamConsumer.class);
 
-    private String currentShardIterator;
-    private Shard currentShard;
-    private final ShardList shardList = new ShardList();
+    private final ShardIteratorHandler shardIteratorHandler;
 
     public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
+        shardIteratorHandler = new ShardIteratorHandler(endpoint);
     }
 
     @Override
     protected int poll() throws Exception {
         GetRecordsRequest req = new GetRecordsRequest()
-                .withShardIterator(getShardIterator())
+                .withShardIterator(shardIteratorHandler.getShardIterator())
                 .withLimit(getEndpoint().getMaxResultsPerRequest());
         GetRecordsResult result = getClient().getRecords(req);
 
         Queue<Exchange> exchanges = createExchanges(result.getRecords());
         int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
 
-        currentShardIterator = result.getNextShardIterator();
+        
shardIteratorHandler.updateShardIterator(result.getNextShardIterator());
 
         return processedExchangeCount;
     }
@@ -95,75 +86,6 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
         return (DdbStreamEndpoint) super.getEndpoint();
     }
 
-    private String getShardIterator() {
-        // either return a cached one or get a new one via a GetShardIterator 
request.
-        if (currentShardIterator == null) {
-            ListStreamsRequest req0 = new ListStreamsRequest()
-                    .withTableName(getEndpoint().getTableName());
-            ListStreamsResult res0 = getClient().listStreams(req0);
-            final String streamArn = res0.getStreams().get(0).getStreamArn(); 
// XXX assumes there is only one stream
-            DescribeStreamRequest req1 = new DescribeStreamRequest()
-                    .withStreamArn(streamArn);
-            DescribeStreamResult res1 = getClient().describeStream(req1);
-            shardList.addAll(res1.getStreamDescription().getShards());
-
-            LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
-            if (currentShard == null) {
-                switch(getEndpoint().getIteratorType()) {
-                case AFTER_SEQUENCE_NUMBER:
-                    currentShard = 
shardList.afterSeq(getEndpoint().getSequenceNumber());
-                    break;
-                case AT_SEQUENCE_NUMBER:
-                    currentShard = 
shardList.atSeq(getEndpoint().getSequenceNumber());
-                    break;
-                case TRIM_HORIZON:
-                    currentShard = shardList.first();
-                    break;
-                case LATEST:
-                default:
-                    currentShard = shardList.last();
-                    break;
-                }
-            } else {
-                currentShard = shardList.nextAfter(currentShard);
-            }
-            shardList.removeOlderThan(currentShard);
-            LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
-
-            GetShardIteratorRequest req = new GetShardIteratorRequest()
-                    .withStreamArn(streamArn)
-                    .withShardId(currentShard.getShardId())
-                    .withShardIteratorType(getEndpoint().getIteratorType());
-            switch(getEndpoint().getIteratorType()) {
-            case AFTER_SEQUENCE_NUMBER:
-            case AT_SEQUENCE_NUMBER:
-                // if you request with a sequence number that is LESS than the
-                // start of the shard, you get a HTTP 400 from AWS.
-                // So only add the sequence number if the endpoints
-                // sequence number is less than or equal to the starting
-                // sequence for the shard.
-                // Otherwise change the shart iterator type to trim_horizon
-                // because we get a 400 when we use one of the
-                // {at,after}_sequence_number iterator types and don't supply
-                // a sequence number.
-                if (BigIntComparisons.Conditions.LTEQ.matches(
-                        new 
BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
-                        new BigInteger(getEndpoint().getSequenceNumber())
-                )) {
-                    req = 
req.withSequenceNumber(getEndpoint().getSequenceNumber());
-                } else {
-                    req = 
req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
-                }
-                break;
-            default:
-            }
-            GetShardIteratorResult result = getClient().getShardIterator(req);
-            currentShardIterator = result.getShardIterator();
-        }
-        LOG.trace("Shard Iterator is: {}", currentShardIterator);
-        return currentShardIterator;
-    }
-
     private Queue<Exchange> createExchanges(List<Record> records) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
         BigIntComparisons condition;

http://git-wip-us.apache.org/repos/asf/camel/blob/d097871c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
new file mode 100644
index 0000000..42c111d
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
@@ -0,0 +1,110 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Shard;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardIteratorHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ShardIteratorHandler.class);
+
+    private final DdbStreamEndpoint endpoint;
+    private final ShardList shardList = new ShardList();
+
+    private String currentShardIterator;
+    private Shard currentShard;
+
+    ShardIteratorHandler(DdbStreamEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    String getShardIterator() {
+        // either return a cached one or get a new one via a GetShardIterator 
request.
+        if (currentShardIterator == null) {
+            ListStreamsRequest req0 = new ListStreamsRequest()
+                    .withTableName(getEndpoint().getTableName());
+            ListStreamsResult res0 = getClient().listStreams(req0);
+            final String streamArn = res0.getStreams().get(0).getStreamArn(); 
// XXX assumes there is only one stream
+            DescribeStreamRequest req1 = new DescribeStreamRequest()
+                    .withStreamArn(streamArn);
+            DescribeStreamResult res1 = getClient().describeStream(req1);
+            shardList.addAll(res1.getStreamDescription().getShards());
+
+            LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
+            if (currentShard == null) {
+                switch(getEndpoint().getIteratorType()) {
+                case AFTER_SEQUENCE_NUMBER:
+                    currentShard = 
shardList.afterSeq(getEndpoint().getSequenceNumber());
+                    break;
+                case AT_SEQUENCE_NUMBER:
+                    currentShard = 
shardList.atSeq(getEndpoint().getSequenceNumber());
+                    break;
+                case TRIM_HORIZON:
+                    currentShard = shardList.first();
+                    break;
+                case LATEST:
+                default:
+                    currentShard = shardList.last();
+                    break;
+                }
+            } else {
+                currentShard = shardList.nextAfter(currentShard);
+            }
+            shardList.removeOlderThan(currentShard);
+            LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
+
+            GetShardIteratorRequest req = new GetShardIteratorRequest()
+                    .withStreamArn(streamArn)
+                    .withShardId(currentShard.getShardId())
+                    .withShardIteratorType(getEndpoint().getIteratorType());
+            switch(getEndpoint().getIteratorType()) {
+            case AFTER_SEQUENCE_NUMBER:
+            case AT_SEQUENCE_NUMBER:
+                // if you request with a sequence number that is LESS than the
+                // start of the shard, you get a HTTP 400 from AWS.
+                // So only add the sequence number if the endpoints
+                // sequence number is less than or equal to the starting
+                // sequence for the shard.
+                // Otherwise change the shart iterator type to trim_horizon
+                // because we get a 400 when we use one of the
+                // {at,after}_sequence_number iterator types and don't supply
+                // a sequence number.
+                if (BigIntComparisons.Conditions.LTEQ.matches(
+                        new 
BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
+                        new BigInteger(getEndpoint().getSequenceNumber())
+                )) {
+                    req = 
req.withSequenceNumber(getEndpoint().getSequenceNumber());
+                } else {
+                    req = 
req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
+                }
+                break;
+            default:
+            }
+            GetShardIteratorResult result = getClient().getShardIterator(req);
+            currentShardIterator = result.getShardIterator();
+        }
+        LOG.trace("Shard Iterator is: {}", currentShardIterator);
+        return currentShardIterator;
+    }
+
+    void updateShardIterator(String nextShardIterator) {
+        this.currentShardIterator = nextShardIterator;
+    }
+
+    DdbStreamEndpoint getEndpoint() {
+        return endpoint;
+    }
+   
+    private AmazonDynamoDBStreams getClient() {
+        return getEndpoint().getClient();
+    }
+}

Reply via email to