CAMEL-9515 Post implementation refactor.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2f1c8599
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2f1c8599
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2f1c8599

Branch: refs/heads/master
Commit: 2f1c8599c689146cad9ec312086b565b05299287
Parents: 258f9c9
Author: Candle <can...@candle.me.uk>
Authored: Wed Jan 20 13:16:10 2016 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Jan 21 10:10:27 2016 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumer.java        | 10 ++-
 .../aws/ddbstream/ShardIteratorHandler.java     | 80 +++++++++++---------
 2 files changed, 50 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2f1c8599/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index b3e451e..07a481f 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -39,6 +39,7 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(DdbStreamConsumer.class);
 
     private final ShardIteratorHandler shardIteratorHandler;
+    private String lastSeenSequenceNumber;
 
     public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) {
         this(endpoint, processor, new ShardIteratorHandler(endpoint));
@@ -49,7 +50,6 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
         this.shardIteratorHandler = shardIteratorHandler;
     }
 
-    private String lastSeenSequenceNumber;
     @Override
     protected int poll() throws Exception {
         GetRecordsResult result;
@@ -65,13 +65,14 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
                         .withLimit(getEndpoint().getMaxResultsPerRequest());
             result = getClient().getRecords(req);
         }
+        List<Record> records = result.getRecords();
 
-        Queue<Exchange> exchanges = createExchanges(result.getRecords(), 
lastSeenSequenceNumber);
+        Queue<Exchange> exchanges = createExchanges(records, 
lastSeenSequenceNumber);
         int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
 
         
shardIteratorHandler.updateShardIterator(result.getNextShardIterator());
-        if (!result.getRecords().isEmpty()) {
-            lastSeenSequenceNumber = 
result.getRecords().get((result.getRecords().size()-1)).getDynamodb().getSequenceNumber();
+        if (!records.isEmpty()) {
+            lastSeenSequenceNumber = records.get(records.size() - 
1).getDynamodb().getSequenceNumber();
         }
 
         return processedExchangeCount;
@@ -121,6 +122,7 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
             condition = BigIntComparisons.Conditions.LTEQ;
             providedSeqNum = new 
BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
             break;
+        default:
         }
         for (Record record : records) {
             BigInteger recordSeqNum = new 
BigInteger(record.getDynamodb().getSequenceNumber());

http://git-wip-us.apache.org/repos/asf/camel/blob/2f1c8599/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
index 1d20f8b..de6d242 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
@@ -47,21 +47,22 @@ class ShardIteratorHandler {
         ShardIteratorType iteratorType = getEndpoint().getIteratorType();
         String sequenceNumber = getEndpoint().getSequenceNumber();
         if (resumeFromSequenceNumber != null) {
-            iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+            // Reset things as we're in an error condition.
             currentShard = null;
             currentShardIterator = null;
+            iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
             sequenceNumber = resumeFromSequenceNumber;
         }
         // either return a cached one or get a new one via a GetShardIterator 
request.
         if (currentShardIterator == null) {
-            ListStreamsRequest req0 = new ListStreamsRequest()
-                    .withTableName(getEndpoint().getTableName());
-            ListStreamsResult res0 = getClient().listStreams(req0);
-            final String streamArn = res0.getStreams().get(0).getStreamArn(); 
// XXX assumes there is only one stream
-            DescribeStreamRequest req1 = new DescribeStreamRequest()
-                    .withStreamArn(streamArn);
-            DescribeStreamResult res1 = getClient().describeStream(req1);
-            shardList.addAll(res1.getStreamDescription().getShards());
+            ListStreamsResult streamsListResult = getClient().listStreams(
+                    new 
ListStreamsRequest().withTableName(getEndpoint().getTableName())
+            );
+            final String streamArn = 
streamsListResult.getStreams().get(0).getStreamArn(); // XXX assumes there is 
only one stream
+            DescribeStreamResult streamDescriptionResult = 
getClient().describeStream(
+                    new DescribeStreamRequest().withStreamArn(streamArn)
+            );
+            
shardList.addAll(streamDescriptionResult.getStreamDescription().getShards());
 
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
@@ -71,40 +72,47 @@ class ShardIteratorHandler {
             }
             shardList.removeOlderThan(currentShard);
             LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
-            GetShardIteratorRequest req = new GetShardIteratorRequest()
-                    .withStreamArn(streamArn)
-                    .withShardId(currentShard.getShardId())
-                    .withShardIteratorType(iteratorType);
-            if (getEndpoint().getIteratorType() == 
ShardIteratorType.AFTER_SEQUENCE_NUMBER
-                    || getEndpoint().getIteratorType() == 
ShardIteratorType.AFTER_SEQUENCE_NUMBER
-                    || resumeFromSequenceNumber != null
-                    ) {
-                // if you request with a sequence number that is LESS than the
-                // start of the shard, you get a HTTP 400 from AWS.
-                // So only add the sequence number if the endpoints
-                // sequence number is less than or equal to the starting
-                // sequence for the shard.
-                // Otherwise change the shart iterator type to trim_horizon
-                // because we get a 400 when we use one of the
-                // {at,after}_sequence_number iterator types and don't supply
-                // a sequence number.
 
-                if (BigIntComparisons.Conditions.LTEQ.matches(
-                        new 
BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
-                        new BigInteger(sequenceNumber)
-                )) {
-                    req = req.withSequenceNumber(sequenceNumber);
-                } else {
-                    req = 
req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
-                }
-            }
-            GetShardIteratorResult result = getClient().getShardIterator(req);
+            GetShardIteratorResult result = getClient().getShardIterator(
+                    buildGetShardIteratorRequest(streamArn, iteratorType, 
sequenceNumber)
+            );
             currentShardIterator = result.getShardIterator();
         }
         LOG.trace("Shard Iterator is: {}", currentShardIterator);
         return currentShardIterator;
     }
 
+    private GetShardIteratorRequest buildGetShardIteratorRequest(final String 
streamArn, ShardIteratorType iteratorType, String sequenceNumber) {
+        GetShardIteratorRequest req = new GetShardIteratorRequest()
+                .withStreamArn(streamArn)
+                .withShardId(currentShard.getShardId())
+                .withShardIteratorType(iteratorType);
+        switch (iteratorType) {
+        case AFTER_SEQUENCE_NUMBER:
+        case AT_SEQUENCE_NUMBER:
+            // if you request with a sequence number that is LESS than the
+            // start of the shard, you get a HTTP 400 from AWS.
+            // So only add the sequence number if the endpoints
+            // sequence number is less than or equal to the starting
+            // sequence for the shard.
+            // Otherwise change the shart iterator type to trim_horizon
+            // because we get a 400 when we use one of the
+            // {at,after}_sequence_number iterator types and don't supply
+            // a sequence number.
+            if (BigIntComparisons.Conditions.LTEQ.matches(
+                    new 
BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
+                    new BigInteger(sequenceNumber)
+            )) {
+                req = req.withSequenceNumber(sequenceNumber);
+            } else {
+                req = 
req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
+            }
+            break;
+        default:
+        }
+        return req;
+    }
+
     private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) {
         switch(type) {
         case AFTER_SEQUENCE_NUMBER:

Reply via email to