Repository: camel Updated Branches: refs/heads/master e23bf831a -> 2f1c8599c
CAMEL-9515 refactor to extract the shard iterator discovery code, as it's going to get more complicated for this ticket. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d097871c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d097871c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d097871c Branch: refs/heads/master Commit: d097871c664f17fad96c19eb014fd6bd828d1357 Parents: 81c5a19 Author: Candle <can...@candle.me.uk> Authored: Mon Jan 18 09:48:11 2016 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jan 21 10:10:24 2016 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamConsumer.java | 86 +-------------- .../aws/ddbstream/ShardIteratorHandler.java | 110 +++++++++++++++++++ 2 files changed, 114 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d097871c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index aad6550..4bdb5c6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -22,17 +22,9 @@ import java.util.List; import java.util.Queue; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; -import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; -import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; -import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; -import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; -import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; -import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; import com.amazonaws.services.dynamodbv2.model.Record; -import com.amazonaws.services.dynamodbv2.model.Shard; -import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -45,25 +37,24 @@ import org.slf4j.LoggerFactory; public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class); - private String currentShardIterator; - private Shard currentShard; - private final ShardList shardList = new ShardList(); + private final ShardIteratorHandler shardIteratorHandler; public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) { super(endpoint, processor); + shardIteratorHandler = new ShardIteratorHandler(endpoint); } @Override protected int poll() throws Exception { GetRecordsRequest req = new GetRecordsRequest() - .withShardIterator(getShardIterator()) + .withShardIterator(shardIteratorHandler.getShardIterator()) .withLimit(getEndpoint().getMaxResultsPerRequest()); GetRecordsResult result = getClient().getRecords(req); Queue<Exchange> exchanges = createExchanges(result.getRecords()); int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); - currentShardIterator = result.getNextShardIterator(); + shardIteratorHandler.updateShardIterator(result.getNextShardIterator()); return processedExchangeCount; } @@ -95,75 +86,6 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { return (DdbStreamEndpoint) super.getEndpoint(); } - private String getShardIterator() { - // either return a cached one or get a new one via a GetShardIterator request. - if (currentShardIterator == null) { - ListStreamsRequest req0 = new ListStreamsRequest() - .withTableName(getEndpoint().getTableName()); - ListStreamsResult res0 = getClient().listStreams(req0); - final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream - DescribeStreamRequest req1 = new DescribeStreamRequest() - .withStreamArn(streamArn); - DescribeStreamResult res1 = getClient().describeStream(req1); - shardList.addAll(res1.getStreamDescription().getShards()); - - LOG.trace("Current shard is: {} (in {})", currentShard, shardList); - if (currentShard == null) { - switch(getEndpoint().getIteratorType()) { - case AFTER_SEQUENCE_NUMBER: - currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber()); - break; - case AT_SEQUENCE_NUMBER: - currentShard = shardList.atSeq(getEndpoint().getSequenceNumber()); - break; - case TRIM_HORIZON: - currentShard = shardList.first(); - break; - case LATEST: - default: - currentShard = shardList.last(); - break; - } - } else { - currentShard = shardList.nextAfter(currentShard); - } - shardList.removeOlderThan(currentShard); - LOG.trace("Next shard is: {} (in {})", currentShard, shardList); - - GetShardIteratorRequest req = new GetShardIteratorRequest() - .withStreamArn(streamArn) - .withShardId(currentShard.getShardId()) - .withShardIteratorType(getEndpoint().getIteratorType()); - switch(getEndpoint().getIteratorType()) { - case AFTER_SEQUENCE_NUMBER: - case AT_SEQUENCE_NUMBER: - // if you request with a sequence number that is LESS than the - // start of the shard, you get a HTTP 400 from AWS. - // So only add the sequence number if the endpoints - // sequence number is less than or equal to the starting - // sequence for the shard. - // Otherwise change the shart iterator type to trim_horizon - // because we get a 400 when we use one of the - // {at,after}_sequence_number iterator types and don't supply - // a sequence number. - if (BigIntComparisons.Conditions.LTEQ.matches( - new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), - new BigInteger(getEndpoint().getSequenceNumber()) - )) { - req = req.withSequenceNumber(getEndpoint().getSequenceNumber()); - } else { - req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); - } - break; - default: - } - GetShardIteratorResult result = getClient().getShardIterator(req); - currentShardIterator = result.getShardIterator(); - } - LOG.trace("Shard Iterator is: {}", currentShardIterator); - return currentShardIterator; - } - private Queue<Exchange> createExchanges(List<Record> records) { Queue<Exchange> exchanges = new ArrayDeque<>(); BigIntComparisons condition; http://git-wip-us.apache.org/repos/asf/camel/blob/d097871c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java new file mode 100644 index 0000000..42c111d --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java @@ -0,0 +1,110 @@ +package org.apache.camel.component.aws.ddbstream; + +import java.math.BigInteger; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; +import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; +import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; +import com.amazonaws.services.dynamodbv2.model.Shard; +import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ShardIteratorHandler { + private static final Logger LOG = LoggerFactory.getLogger(ShardIteratorHandler.class); + + private final DdbStreamEndpoint endpoint; + private final ShardList shardList = new ShardList(); + + private String currentShardIterator; + private Shard currentShard; + + ShardIteratorHandler(DdbStreamEndpoint endpoint) { + this.endpoint = endpoint; + } + + String getShardIterator() { + // either return a cached one or get a new one via a GetShardIterator request. + if (currentShardIterator == null) { + ListStreamsRequest req0 = new ListStreamsRequest() + .withTableName(getEndpoint().getTableName()); + ListStreamsResult res0 = getClient().listStreams(req0); + final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream + DescribeStreamRequest req1 = new DescribeStreamRequest() + .withStreamArn(streamArn); + DescribeStreamResult res1 = getClient().describeStream(req1); + shardList.addAll(res1.getStreamDescription().getShards()); + + LOG.trace("Current shard is: {} (in {})", currentShard, shardList); + if (currentShard == null) { + switch(getEndpoint().getIteratorType()) { + case AFTER_SEQUENCE_NUMBER: + currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber()); + break; + case AT_SEQUENCE_NUMBER: + currentShard = shardList.atSeq(getEndpoint().getSequenceNumber()); + break; + case TRIM_HORIZON: + currentShard = shardList.first(); + break; + case LATEST: + default: + currentShard = shardList.last(); + break; + } + } else { + currentShard = shardList.nextAfter(currentShard); + } + shardList.removeOlderThan(currentShard); + LOG.trace("Next shard is: {} (in {})", currentShard, shardList); + + GetShardIteratorRequest req = new GetShardIteratorRequest() + .withStreamArn(streamArn) + .withShardId(currentShard.getShardId()) + .withShardIteratorType(getEndpoint().getIteratorType()); + switch(getEndpoint().getIteratorType()) { + case AFTER_SEQUENCE_NUMBER: + case AT_SEQUENCE_NUMBER: + // if you request with a sequence number that is LESS than the + // start of the shard, you get a HTTP 400 from AWS. + // So only add the sequence number if the endpoints + // sequence number is less than or equal to the starting + // sequence for the shard. + // Otherwise change the shart iterator type to trim_horizon + // because we get a 400 when we use one of the + // {at,after}_sequence_number iterator types and don't supply + // a sequence number. + if (BigIntComparisons.Conditions.LTEQ.matches( + new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), + new BigInteger(getEndpoint().getSequenceNumber()) + )) { + req = req.withSequenceNumber(getEndpoint().getSequenceNumber()); + } else { + req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); + } + break; + default: + } + GetShardIteratorResult result = getClient().getShardIterator(req); + currentShardIterator = result.getShardIterator(); + } + LOG.trace("Shard Iterator is: {}", currentShardIterator); + return currentShardIterator; + } + + void updateShardIterator(String nextShardIterator) { + this.currentShardIterator = nextShardIterator; + } + + DdbStreamEndpoint getEndpoint() { + return endpoint; + } + + private AmazonDynamoDBStreams getClient() { + return getEndpoint().getClient(); + } +}