CAMEL-9515 Post implementation refactor.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2f1c8599 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2f1c8599 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2f1c8599 Branch: refs/heads/master Commit: 2f1c8599c689146cad9ec312086b565b05299287 Parents: 258f9c9 Author: Candle <can...@candle.me.uk> Authored: Wed Jan 20 13:16:10 2016 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jan 21 10:10:27 2016 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamConsumer.java | 10 ++- .../aws/ddbstream/ShardIteratorHandler.java | 80 +++++++++++--------- 2 files changed, 50 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2f1c8599/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index b3e451e..07a481f 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -39,6 +39,7 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class); private final ShardIteratorHandler shardIteratorHandler; + private String lastSeenSequenceNumber; public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) { this(endpoint, processor, new ShardIteratorHandler(endpoint)); @@ -49,7 +50,6 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { this.shardIteratorHandler = shardIteratorHandler; } - private String lastSeenSequenceNumber; @Override protected int poll() throws Exception { GetRecordsResult result; @@ -65,13 +65,14 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { .withLimit(getEndpoint().getMaxResultsPerRequest()); result = getClient().getRecords(req); } + List<Record> records = result.getRecords(); - Queue<Exchange> exchanges = createExchanges(result.getRecords(), lastSeenSequenceNumber); + Queue<Exchange> exchanges = createExchanges(records, lastSeenSequenceNumber); int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); shardIteratorHandler.updateShardIterator(result.getNextShardIterator()); - if (!result.getRecords().isEmpty()) { - lastSeenSequenceNumber = result.getRecords().get((result.getRecords().size()-1)).getDynamodb().getSequenceNumber(); + if (!records.isEmpty()) { + lastSeenSequenceNumber = records.get(records.size() - 1).getDynamodb().getSequenceNumber(); } return processedExchangeCount; @@ -121,6 +122,7 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { condition = BigIntComparisons.Conditions.LTEQ; providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber()); break; + default: } for (Record record : records) { BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber()); http://git-wip-us.apache.org/repos/asf/camel/blob/2f1c8599/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java index 1d20f8b..de6d242 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardIteratorHandler.java @@ -47,21 +47,22 @@ class ShardIteratorHandler { ShardIteratorType iteratorType = getEndpoint().getIteratorType(); String sequenceNumber = getEndpoint().getSequenceNumber(); if (resumeFromSequenceNumber != null) { - iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; + // Reset things as we're in an error condition. currentShard = null; currentShardIterator = null; + iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; sequenceNumber = resumeFromSequenceNumber; } // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { - ListStreamsRequest req0 = new ListStreamsRequest() - .withTableName(getEndpoint().getTableName()); - ListStreamsResult res0 = getClient().listStreams(req0); - final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream - DescribeStreamRequest req1 = new DescribeStreamRequest() - .withStreamArn(streamArn); - DescribeStreamResult res1 = getClient().describeStream(req1); - shardList.addAll(res1.getStreamDescription().getShards()); + ListStreamsResult streamsListResult = getClient().listStreams( + new ListStreamsRequest().withTableName(getEndpoint().getTableName()) + ); + final String streamArn = streamsListResult.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream + DescribeStreamResult streamDescriptionResult = getClient().describeStream( + new DescribeStreamRequest().withStreamArn(streamArn) + ); + shardList.addAll(streamDescriptionResult.getStreamDescription().getShards()); LOG.trace("Current shard is: {} (in {})", currentShard, shardList); if (currentShard == null) { @@ -71,40 +72,47 @@ class ShardIteratorHandler { } shardList.removeOlderThan(currentShard); LOG.trace("Next shard is: {} (in {})", currentShard, shardList); - GetShardIteratorRequest req = new GetShardIteratorRequest() - .withStreamArn(streamArn) - .withShardId(currentShard.getShardId()) - .withShardIteratorType(iteratorType); - if (getEndpoint().getIteratorType() == ShardIteratorType.AFTER_SEQUENCE_NUMBER - || getEndpoint().getIteratorType() == ShardIteratorType.AFTER_SEQUENCE_NUMBER - || resumeFromSequenceNumber != null - ) { - // if you request with a sequence number that is LESS than the - // start of the shard, you get a HTTP 400 from AWS. - // So only add the sequence number if the endpoints - // sequence number is less than or equal to the starting - // sequence for the shard. - // Otherwise change the shart iterator type to trim_horizon - // because we get a 400 when we use one of the - // {at,after}_sequence_number iterator types and don't supply - // a sequence number. - if (BigIntComparisons.Conditions.LTEQ.matches( - new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), - new BigInteger(sequenceNumber) - )) { - req = req.withSequenceNumber(sequenceNumber); - } else { - req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); - } - } - GetShardIteratorResult result = getClient().getShardIterator(req); + GetShardIteratorResult result = getClient().getShardIterator( + buildGetShardIteratorRequest(streamArn, iteratorType, sequenceNumber) + ); currentShardIterator = result.getShardIterator(); } LOG.trace("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; } + private GetShardIteratorRequest buildGetShardIteratorRequest(final String streamArn, ShardIteratorType iteratorType, String sequenceNumber) { + GetShardIteratorRequest req = new GetShardIteratorRequest() + .withStreamArn(streamArn) + .withShardId(currentShard.getShardId()) + .withShardIteratorType(iteratorType); + switch (iteratorType) { + case AFTER_SEQUENCE_NUMBER: + case AT_SEQUENCE_NUMBER: + // if you request with a sequence number that is LESS than the + // start of the shard, you get a HTTP 400 from AWS. + // So only add the sequence number if the endpoints + // sequence number is less than or equal to the starting + // sequence for the shard. + // Otherwise change the shart iterator type to trim_horizon + // because we get a 400 when we use one of the + // {at,after}_sequence_number iterator types and don't supply + // a sequence number. + if (BigIntComparisons.Conditions.LTEQ.matches( + new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), + new BigInteger(sequenceNumber) + )) { + req = req.withSequenceNumber(sequenceNumber); + } else { + req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); + } + break; + default: + } + return req; + } + private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) { switch(type) { case AFTER_SEQUENCE_NUMBER: