Add filtering of the records when asking for an {at,after}_sequence_number iterator type.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8e05657d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8e05657d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8e05657d Branch: refs/heads/master Commit: 8e05657d6a742bb6b8b0026c9683953c38168a66 Parents: b2430c0 Author: Candle <can...@candle.me.uk> Authored: Mon Dec 21 18:29:38 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 24 09:45:32 2015 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/AtAfterCondition.java | 44 +++++++++++++ .../aws/ddbstream/DdbStreamConsumer.java | 45 ++++++++++++- .../component/aws/ddbstream/ShardList.java | 24 +------ .../aws/ddbstream/AtAfterConditionTest.java | 67 ++++++++++++++++++++ .../aws/ddbstream/DdbStreamConsumerTest.java | 56 +++++++++++++++- 5 files changed, 212 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java new file mode 100644 index 0000000..a6798b5 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import java.math.BigInteger; + +interface AtAfterCondition { + + /** + * @return true if sequenceNumber is (at,after) the endpointSequenceNumber. + */ + boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber); + + static enum Conditions implements AtAfterCondition { + AFTER() { + @Override + public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) { + return endpointSequenceNumber.compareTo(sequenceNumber) < 0; + } + }, + + AT() { + @Override + public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) { + return endpointSequenceNumber.compareTo(sequenceNumber) <= 0; + } + } + // TODO rename to LT/LTEQ/EQ/GTEQ/GT + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index 25e5f31..d520abf 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.aws.ddbstream; +import java.math.BigInteger; import java.util.ArrayDeque; import java.util.List; import java.util.Queue; @@ -31,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; import com.amazonaws.services.dynamodbv2.model.Record; import com.amazonaws.services.dynamodbv2.model.Shard; +import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -132,6 +134,30 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { .withStreamArn(streamArn) .withShardId(currentShard.getShardId()) .withShardIteratorType(getEndpoint().getIteratorType()); + switch(getEndpoint().getIteratorType()) { + case AFTER_SEQUENCE_NUMBER: + case AT_SEQUENCE_NUMBER: + // if you request with a sequence number that is LESS than the + // start of the shard, you get a HTTP 400 from AWS. + // So only add the sequence number if the endpoints + // sequence number is AT or AFTER the starting sequence for + // the shard. + // Otherwise change the shart iterator type to trim_horizon + // because we get a 400 when we use one of the + // {at,after}_sequence_number iterator types and don't supply + // a sequence number. + if (AtAfterCondition.Conditions.AT.matches( + new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), + new BigInteger(getEndpoint().getSequenceNumber()) + )) { + req = req.withSequenceNumber(getEndpoint().getSequenceNumber()) + .withShardIteratorType(getEndpoint().getIteratorType()); + } else { + req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); + } + break; + default: + } GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } @@ -141,8 +167,25 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { private Queue<Exchange> createExchanges(List<Record> records) { Queue<Exchange> exchanges = new ArrayDeque<>(); + AtAfterCondition condition; + BigInteger providedSeqNum = null; + switch(getEndpoint().getIteratorType()) { + case AFTER_SEQUENCE_NUMBER: + condition = AtAfterCondition.Conditions.AFTER; + providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber()); + break; + case AT_SEQUENCE_NUMBER: + condition = AtAfterCondition.Conditions.AT; + providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber()); + break; + default: + condition = null; + } for (Record record : records) { - exchanges.add(getEndpoint().createExchange(record)); + BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber()); + if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) { + exchanges.add(getEndpoint().createExchange(record)); + } } return exchanges; } http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java index 0a6e332..5c5bd29 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java @@ -79,11 +79,11 @@ class ShardList { } Shard afterSeq(String sequenceNumber) { - return atAfterSeq(sequenceNumber, After.INSTANCE); + return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AFTER); } Shard atSeq(String sequenceNumber) { - return atAfterSeq(sequenceNumber, At.INSTANCE); + return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AT); } Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) { @@ -134,27 +134,9 @@ class ShardList { return "ShardList{" + "shards=" + shards + '}'; } - private interface AtAfterCondition { - boolean matches(BigInteger sequenceNumber, BigInteger option); - } - private static enum After implements AtAfterCondition { - INSTANCE() { - @Override - public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) { - return providedSequenceNumber.compareTo(shardSequenceNumber) < 0; - } - } - } - private static enum At implements AtAfterCondition { - INSTANCE() { - @Override - public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) { - return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0; - } - } - } + private static enum StartingSequenceNumberComparator implements Comparator<Shard> { INSTANCE() { http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java new file mode 100644 index 0000000..53aee40 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + + +@RunWith(Parameterized.class) +public class AtAfterConditionTest { + + + private final AtAfterCondition condition; + private final int smaller; + private final int bigger; + private final boolean result; + + public AtAfterConditionTest(AtAfterCondition condition, int smaller, int bigger, boolean result) { + this.condition = condition; + this.smaller = smaller; + this.bigger = bigger; + this.result = result; + } + + @Parameterized.Parameters + public static Collection<Object[]> parameters() { + List<Object[]> results = new ArrayList<>(); + + results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 5, true}); + results.add(new Object[]{AtAfterCondition.Conditions.AT , 1, 5, true}); + results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 1, false}); + results.add(new Object[]{AtAfterCondition.Conditions.AT , 1, 1, true}); + results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 5, 1, false}); + results.add(new Object[]{AtAfterCondition.Conditions.AT , 5, 1, false}); + + return results; + } + + @Test + public void test() throws Exception { + assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java index 3ef6c70..0cbf9ca 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.component.aws.ddbstream; import java.util.ArrayList; @@ -110,12 +126,15 @@ public class DdbStreamConsumerTest { @Override public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable { final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator(); - String nextShardIterator = shardIterators.get(shardIterator); // note that HashMap returns null when there is no entry in the map. A null 'nextShardIterator' indicates that the shard has finished and we should move onto the next shard. + // note that HashMap returns null when there is no entry in the map. + // A null 'nextShardIterator' indicates that the shard has finished + // and we should move onto the next shard. + String nextShardIterator = shardIterators.get(shardIterator); Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator); Collection<Record> ans = answers.get(shardIterator); if (nextShardIterator == null && m.matches()) { // last shard iterates forever. Integer num = Integer.parseInt(m.group(1)); - nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num+1), 3); + nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3); } if (null == ans) { // default to an empty list of records. ans = createRecords(); @@ -227,6 +246,39 @@ public class DdbStreamConsumerTest { assertThat(getIteratorCaptor.getValue().getShardId(), is("c")); } + @Test + public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception { + endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); + endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); + undertest = new DdbStreamConsumer(endpoint, processor); + + for (int i = 0; i < 10; ++i) { // poll lots. + undertest.poll(); + } + + ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); + + assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("35")); + assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40")); + } + + @Test + public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception { + endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); + undertest = new DdbStreamConsumer(endpoint, processor); + + for (int i = 0; i < 10; ++i) { // poll lots. + undertest.poll(); + } + + ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + verify(processor, times(1)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); + + assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40")); + } + private static Collection<Record> createRecords(String... sequenceNumbers) { List<Record> results = new ArrayList<>();