CAMEL-9515 Add consumer handling of ExpiredShardExceptions.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c99648c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c99648c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c99648c

Branch: refs/heads/master
Commit: 9c99648c601097bda6c5b08360b1c45c345793d8
Parents: 98a33dd
Author: Candle <can...@candle.me.uk>
Authored: Wed Jan 20 10:27:47 2016 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Jan 21 10:10:26 2016 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/DdbStreamConsumer.java        | 34 ++++++--
 .../aws/ddbstream/ShardIteratorHandler.java     |  2 +-
 .../aws/ddbstream/DdbStreamConsumerTest.java    | 86 ++++++++++++++------
 .../aws/ddbstream/ShardIteratorHandlerTest.java | 13 ++-
 4 files changed, 94 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index c407df6..b3e451e 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Queue;
 
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.ExpiredIteratorException;
 import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
 import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
@@ -48,17 +49,30 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
         this.shardIteratorHandler = shardIteratorHandler;
     }
 
+    private String lastSeenSequenceNumber;
     @Override
     protected int poll() throws Exception {
-        GetRecordsRequest req = new GetRecordsRequest()
-                .withShardIterator(shardIteratorHandler.getShardIterator())
-                .withLimit(getEndpoint().getMaxResultsPerRequest());
-        GetRecordsResult result = getClient().getRecords(req);
+        GetRecordsResult result;
+        try {
+            GetRecordsRequest req = new GetRecordsRequest()
+                        
.withShardIterator(shardIteratorHandler.getShardIterator(null))
+                        .withLimit(getEndpoint().getMaxResultsPerRequest());
+            result = getClient().getRecords(req);
+        } catch (ExpiredIteratorException e) {
+            LOG.warn("Expired Shard Iterator, attempting to resume from " + 
lastSeenSequenceNumber, e);
+            GetRecordsRequest req = new GetRecordsRequest()
+                        
.withShardIterator(shardIteratorHandler.getShardIterator(lastSeenSequenceNumber))
+                        .withLimit(getEndpoint().getMaxResultsPerRequest());
+            result = getClient().getRecords(req);
+        }
 
-        Queue<Exchange> exchanges = createExchanges(result.getRecords());
+        Queue<Exchange> exchanges = createExchanges(result.getRecords(), 
lastSeenSequenceNumber);
         int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
 
         
shardIteratorHandler.updateShardIterator(result.getNextShardIterator());
+        if (!result.getRecords().isEmpty()) {
+            lastSeenSequenceNumber = 
result.getRecords().get((result.getRecords().size()-1)).getDynamodb().getSequenceNumber();
+        }
 
         return processedExchangeCount;
     }
@@ -90,10 +104,14 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
         return (DdbStreamEndpoint) super.getEndpoint();
     }
 
-    private Queue<Exchange> createExchanges(List<Record> records) {
+    private Queue<Exchange> createExchanges(List<Record> records, String 
lastSeenSequenceNumber) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
-        BigIntComparisons condition;
+        BigIntComparisons condition = null;
         BigInteger providedSeqNum = null;
+        if (lastSeenSequenceNumber != null) {
+            providedSeqNum = new BigInteger(lastSeenSequenceNumber);
+            condition = BigIntComparisons.Conditions.LT;
+        }
         switch(getEndpoint().getIteratorType()) {
         case AFTER_SEQUENCE_NUMBER:
             condition = BigIntComparisons.Conditions.LT;
@@ -103,8 +121,6 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
             condition = BigIntComparisons.Conditions.LTEQ;
             providedSeqNum = new 
BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
             break;
-        default:
-            condition = null;
         }
         for (Record record : records) {
             BigInteger recordSeqNum = new 
BigInteger(record.getDynamodb().getSequenceNumber());

http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
index 8801991..685907b 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java
@@ -43,7 +43,7 @@ class ShardIteratorHandler {
         this.endpoint = endpoint;
     }
 
-    String getShardIterator() {
+    String getShardIterator(String resumeFromSequenceNumber) {
         // either return a cached one or get a new one via a GetShardIterator 
request.
         if (currentShardIterator == null) {
             ListStreamsRequest req0 = new ListStreamsRequest()

http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
index 13e6d92..c91fc86 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -25,6 +25,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.ExpiredIteratorException;
 import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
 import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
@@ -40,6 +41,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
@@ -47,6 +49,7 @@ import org.mockito.stubbing.Answer;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -62,6 +65,7 @@ public class DdbStreamConsumerTest {
     private final CamelContext context = new DefaultCamelContext();
     private final DdbStreamComponent component = new 
DdbStreamComponent(context);
     private final DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, 
"table_name", component);
+    private GetRecordsAnswer recordsAnswer;
 
     @Before
     public void setup() throws Exception {
@@ -82,28 +86,8 @@ public class DdbStreamConsumerTest {
         answers.put("shard_iterator_b_002", createRecords("14"));
         answers.put("shard_iterator_d_000", createRecords("21", "25"));
         answers.put("shard_iterator_d_001", createRecords("30", "35", "40"));
-        
when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(new
 Answer<GetRecordsResult>() {
-            @Override
-            public GetRecordsResult answer(InvocationOnMock invocation) throws 
Throwable {
-                final String shardIterator = ((GetRecordsRequest) 
invocation.getArguments()[0]).getShardIterator();
-                // note that HashMap returns null when there is no entry in 
the map.
-                // A null 'nextShardIterator' indicates that the shard has 
finished
-                // and we should move onto the next shard.
-                String nextShardIterator = shardIterators.get(shardIterator);
-                Matcher m = 
Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
-                Collection<Record> ans = answers.get(shardIterator);
-                if (nextShardIterator == null && m.matches()) { // last shard 
iterates forever.
-                    Integer num = Integer.parseInt(m.group(1));
-                    nextShardIterator = "shard_iterator_d_" + 
pad(Integer.toString(num + 1), 3);
-                }
-                if (null == ans) { // default to an empty list of records.
-                    ans = createRecords();
-                }
-                return new GetRecordsResult()
-                        .withRecords(ans)
-                        .withNextShardIterator(nextShardIterator);
-            }
-        });
+        recordsAnswer = new GetRecordsAnswer(shardIterators, answers);
+        
when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(recordsAnswer);
     }
 
     String pad(String num, int to) {
@@ -119,10 +103,32 @@ public class DdbStreamConsumerTest {
     }
 
     @Test
+    public void 
itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() 
throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+        
when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000",
 "shard_iterator_b_001", "shard_iterator_b_001");
+        Mockito.reset(amazonDynamoDBStreams);
+        when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class)))
+                .thenAnswer(recordsAnswer)
+                .thenThrow(new ExpiredIteratorException("expired shard"))
+                .thenAnswer(recordsAnswer);
+
+        undertest.poll();
+        undertest.poll();
+
+        ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
+        verify(processor, times(3)).process(exchangeCaptor.capture(), 
any(AsyncCallback.class));
+        verify(shardIteratorHandler, times(2)).getShardIterator(null); // 
first poll. Second poll, getRecords fails with an expired shard.
+        verify(shardIteratorHandler).getShardIterator("9"); // second poll, 
with a resumeFrom.
+        
assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(),
 is("9"));
+        
assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(),
 is("11"));
+        
assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(),
 is("13"));
+    }
+
+    @Test
     public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception {
         endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
         endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("35"));
-        
when(shardIteratorHandler.getShardIterator()).thenReturn("shard_iterator_d_001",
 "shard_iterator_d_002");
+        
when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001",
 "shard_iterator_d_002");
 
         for (int i = 0; i < 10; ++i) { // poll lots.
             undertest.poll();
@@ -139,7 +145,7 @@ public class DdbStreamConsumerTest {
     public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception {
         endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
         endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("35"));
-        
when(shardIteratorHandler.getShardIterator()).thenReturn("shard_iterator_d_001",
 "shard_iterator_d_002");
+        
when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001",
 "shard_iterator_d_002");
 
         for (int i = 0; i < 10; ++i) { // poll lots.
             undertest.poll();
@@ -162,5 +168,37 @@ public class DdbStreamConsumerTest {
 
         return results;
     }
+
+    private class GetRecordsAnswer implements Answer<GetRecordsResult> {
+
+        private final Map<String, String> shardIterators;
+        private final Map<String, Collection<Record>> answers;
+
+        GetRecordsAnswer(Map<String, String> shardIterators, Map<String, 
Collection<Record>> answers) {
+            this.shardIterators = shardIterators;
+            this.answers = answers;
+        }
+
+        @Override
+        public GetRecordsResult answer(InvocationOnMock invocation) throws 
Throwable {
+            final String shardIterator = ((GetRecordsRequest) 
invocation.getArguments()[0]).getShardIterator();
+            // note that HashMap returns null when there is no entry in the 
map.
+            // A null 'nextShardIterator' indicates that the shard has finished
+            // and we should move onto the next shard.
+            String nextShardIterator = shardIterators.get(shardIterator);
+            Matcher m = 
Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
+            Collection<Record> ans = answers.get(shardIterator);
+            if (nextShardIterator == null && m.matches()) { // last shard 
iterates forever.
+                Integer num = Integer.parseInt(m.group(1));
+                nextShardIterator = "shard_iterator_d_" + 
pad(Integer.toString(num + 1), 3);
+            }
+            if (null == ans) { // default to an empty list of records.
+                ans = createRecords();
+            }
+            return new GetRecordsResult()
+                    .withRecords(ans)
+                    .withNextShardIterator(nextShardIterator);
+        }
+    }
     
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
index 41b7883..c54dbef 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java
@@ -94,7 +94,7 @@ public class ShardIteratorHandlerTest {
     public void latestOnlyUsesTheLastShard() throws Exception {
         endpoint.setIteratorType(ShardIteratorType.LATEST);
 
-        String shardIterator = undertest.getShardIterator();
+        String shardIterator = undertest.getShardIterator(null);
 
         ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
         
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
@@ -107,7 +107,7 @@ public class ShardIteratorHandlerTest {
         endpoint.setIteratorType(ShardIteratorType.LATEST);
 
         undertest.updateShardIterator("bar");
-        String shardIterator = undertest.getShardIterator();
+        String shardIterator = undertest.getShardIterator(null);
 
         verify(amazonDynamoDBStreams, 
times(0)).getShardIterator(any(GetShardIteratorRequest.class));
         assertThat(shardIterator, is("bar"));
@@ -117,7 +117,7 @@ public class ShardIteratorHandlerTest {
     public void trimHorizonStartsWithTheFirstShard() throws Exception {
         endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
 
-        String shardIterator = undertest.getShardIterator();
+        String shardIterator = undertest.getShardIterator(null);
 
         ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
         
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
@@ -125,7 +125,6 @@ public class ShardIteratorHandlerTest {
         assertThat(shardIterator, is("shard_iterator_a_000"));
     }
 
-
     @Test
     public void trimHorizonWalksAllShards() throws Exception {
         endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
@@ -133,7 +132,7 @@ public class ShardIteratorHandlerTest {
         String[] shardIterators = new String[4];
 
         for (int i = 0; i < shardIterators.length; ++i) {
-            shardIterators[i] = undertest.getShardIterator();
+            shardIterators[i] = undertest.getShardIterator(null);
             undertest.updateShardIterator(null);
         }
 
@@ -152,7 +151,7 @@ public class ShardIteratorHandlerTest {
         endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
         endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("12"));
 
-        String shardIterator = undertest.getShardIterator();
+        String shardIterator = undertest.getShardIterator(null);
 
         ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
         
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
@@ -165,7 +164,7 @@ public class ShardIteratorHandlerTest {
         endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
         endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("16"));
 
-        String shardIterator = undertest.getShardIterator();
+        String shardIterator = undertest.getShardIterator(null);
 
         ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
         
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());

Reply via email to