CAMEL-9515 Add consumer handling of ExpiredShardExceptions.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c99648c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c99648c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c99648c Branch: refs/heads/master Commit: 9c99648c601097bda6c5b08360b1c45c345793d8 Parents: 98a33dd Author: Candle <can...@candle.me.uk> Authored: Wed Jan 20 10:27:47 2016 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jan 21 10:10:26 2016 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamConsumer.java | 34 ++++++-- .../aws/ddbstream/ShardIteratorHandler.java | 2 +- .../aws/ddbstream/DdbStreamConsumerTest.java | 86 ++++++++++++++------ .../aws/ddbstream/ShardIteratorHandlerTest.java | 13 ++- 4 files changed, 94 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index c407df6..b3e451e 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Queue; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.ExpiredIteratorException; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; import com.amazonaws.services.dynamodbv2.model.Record; @@ -48,17 +49,30 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { this.shardIteratorHandler = shardIteratorHandler; } + private String lastSeenSequenceNumber; @Override protected int poll() throws Exception { - GetRecordsRequest req = new GetRecordsRequest() - .withShardIterator(shardIteratorHandler.getShardIterator()) - .withLimit(getEndpoint().getMaxResultsPerRequest()); - GetRecordsResult result = getClient().getRecords(req); + GetRecordsResult result; + try { + GetRecordsRequest req = new GetRecordsRequest() + .withShardIterator(shardIteratorHandler.getShardIterator(null)) + .withLimit(getEndpoint().getMaxResultsPerRequest()); + result = getClient().getRecords(req); + } catch (ExpiredIteratorException e) { + LOG.warn("Expired Shard Iterator, attempting to resume from " + lastSeenSequenceNumber, e); + GetRecordsRequest req = new GetRecordsRequest() + .withShardIterator(shardIteratorHandler.getShardIterator(lastSeenSequenceNumber)) + .withLimit(getEndpoint().getMaxResultsPerRequest()); + result = getClient().getRecords(req); + } - Queue<Exchange> exchanges = createExchanges(result.getRecords()); + Queue<Exchange> exchanges = createExchanges(result.getRecords(), lastSeenSequenceNumber); int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); shardIteratorHandler.updateShardIterator(result.getNextShardIterator()); + if (!result.getRecords().isEmpty()) { + lastSeenSequenceNumber = result.getRecords().get((result.getRecords().size()-1)).getDynamodb().getSequenceNumber(); + } return processedExchangeCount; } @@ -90,10 +104,14 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { return (DdbStreamEndpoint) super.getEndpoint(); } - private Queue<Exchange> createExchanges(List<Record> records) { + private Queue<Exchange> createExchanges(List<Record> records, String lastSeenSequenceNumber) { Queue<Exchange> exchanges = new ArrayDeque<>(); - BigIntComparisons condition; + BigIntComparisons condition = null; BigInteger providedSeqNum = null; + if (lastSeenSequenceNumber != null) { + providedSeqNum = new BigInteger(lastSeenSequenceNumber); + condition = BigIntComparisons.Conditions.LT; + } switch(getEndpoint().getIteratorType()) { case AFTER_SEQUENCE_NUMBER: condition = BigIntComparisons.Conditions.LT; @@ -103,8 +121,6 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { condition = BigIntComparisons.Conditions.LTEQ; providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber()); break; - default: - condition = null; } for (Record record : records) { BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber()); http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java index 8801991..685907b 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java @@ -43,7 +43,7 @@ class ShardIteratorHandler { this.endpoint = endpoint; } - String getShardIterator() { + String getShardIterator(String resumeFromSequenceNumber) { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { ListStreamsRequest req0 = new ListStreamsRequest() http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java index 13e6d92..c91fc86 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java @@ -25,6 +25,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.ExpiredIteratorException; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; import com.amazonaws.services.dynamodbv2.model.Record; @@ -40,6 +41,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; @@ -47,6 +49,7 @@ import org.mockito.stubbing.Answer; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -62,6 +65,7 @@ public class DdbStreamConsumerTest { private final CamelContext context = new DefaultCamelContext(); private final DdbStreamComponent component = new DdbStreamComponent(context); private final DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component); + private GetRecordsAnswer recordsAnswer; @Before public void setup() throws Exception { @@ -82,28 +86,8 @@ public class DdbStreamConsumerTest { answers.put("shard_iterator_b_002", createRecords("14")); answers.put("shard_iterator_d_000", createRecords("21", "25")); answers.put("shard_iterator_d_001", createRecords("30", "35", "40")); - when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(new Answer<GetRecordsResult>() { - @Override - public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable { - final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator(); - // note that HashMap returns null when there is no entry in the map. - // A null 'nextShardIterator' indicates that the shard has finished - // and we should move onto the next shard. - String nextShardIterator = shardIterators.get(shardIterator); - Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator); - Collection<Record> ans = answers.get(shardIterator); - if (nextShardIterator == null && m.matches()) { // last shard iterates forever. - Integer num = Integer.parseInt(m.group(1)); - nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3); - } - if (null == ans) { // default to an empty list of records. - ans = createRecords(); - } - return new GetRecordsResult() - .withRecords(ans) - .withNextShardIterator(nextShardIterator); - } - }); + recordsAnswer = new GetRecordsAnswer(shardIterators, answers); + when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(recordsAnswer); } String pad(String num, int to) { @@ -119,10 +103,32 @@ public class DdbStreamConsumerTest { } @Test + public void itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() throws Exception { + endpoint.setIteratorType(ShardIteratorType.LATEST); + when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000", "shard_iterator_b_001", "shard_iterator_b_001"); + Mockito.reset(amazonDynamoDBStreams); + when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))) + .thenAnswer(recordsAnswer) + .thenThrow(new ExpiredIteratorException("expired shard")) + .thenAnswer(recordsAnswer); + + undertest.poll(); + undertest.poll(); + + ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + verify(processor, times(3)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); + verify(shardIteratorHandler, times(2)).getShardIterator(null); // first poll. Second poll, getRecords fails with an expired shard. + verify(shardIteratorHandler).getShardIterator("9"); // second poll, with a resumeFrom. + assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("9")); + assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("11")); + assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("13")); + } + + @Test public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); - when(shardIteratorHandler.getShardIterator()).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); + when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); @@ -139,7 +145,7 @@ public class DdbStreamConsumerTest { public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception { endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); - when(shardIteratorHandler.getShardIterator()).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); + when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); @@ -162,5 +168,37 @@ public class DdbStreamConsumerTest { return results; } + + private class GetRecordsAnswer implements Answer<GetRecordsResult> { + + private final Map<String, String> shardIterators; + private final Map<String, Collection<Record>> answers; + + GetRecordsAnswer(Map<String, String> shardIterators, Map<String, Collection<Record>> answers) { + this.shardIterators = shardIterators; + this.answers = answers; + } + + @Override + public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable { + final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator(); + // note that HashMap returns null when there is no entry in the map. + // A null 'nextShardIterator' indicates that the shard has finished + // and we should move onto the next shard. + String nextShardIterator = shardIterators.get(shardIterator); + Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator); + Collection<Record> ans = answers.get(shardIterator); + if (nextShardIterator == null && m.matches()) { // last shard iterates forever. + Integer num = Integer.parseInt(m.group(1)); + nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3); + } + if (null == ans) { // default to an empty list of records. + ans = createRecords(); + } + return new GetRecordsResult() + .withRecords(ans) + .withNextShardIterator(nextShardIterator); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/9c99648c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java index 41b7883..c54dbef 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandlerTest.java @@ -94,7 +94,7 @@ public class ShardIteratorHandlerTest { public void latestOnlyUsesTheLastShard() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); - String shardIterator = undertest.getShardIterator(); + String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); @@ -107,7 +107,7 @@ public class ShardIteratorHandlerTest { endpoint.setIteratorType(ShardIteratorType.LATEST); undertest.updateShardIterator("bar"); - String shardIterator = undertest.getShardIterator(); + String shardIterator = undertest.getShardIterator(null); verify(amazonDynamoDBStreams, times(0)).getShardIterator(any(GetShardIteratorRequest.class)); assertThat(shardIterator, is("bar")); @@ -117,7 +117,7 @@ public class ShardIteratorHandlerTest { public void trimHorizonStartsWithTheFirstShard() throws Exception { endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); - String shardIterator = undertest.getShardIterator(); + String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); @@ -125,7 +125,6 @@ public class ShardIteratorHandlerTest { assertThat(shardIterator, is("shard_iterator_a_000")); } - @Test public void trimHorizonWalksAllShards() throws Exception { endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); @@ -133,7 +132,7 @@ public class ShardIteratorHandlerTest { String[] shardIterators = new String[4]; for (int i = 0; i < shardIterators.length; ++i) { - shardIterators[i] = undertest.getShardIterator(); + shardIterators[i] = undertest.getShardIterator(null); undertest.updateShardIterator(null); } @@ -152,7 +151,7 @@ public class ShardIteratorHandlerTest { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12")); - String shardIterator = undertest.getShardIterator(); + String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); @@ -165,7 +164,7 @@ public class ShardIteratorHandlerTest { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16")); - String shardIterator = undertest.getShardIterator(); + String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());