Add {at,after}-sequence-number symantics to the shard list container
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2b9d91c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2b9d91c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2b9d91c Branch: refs/heads/master Commit: e2b9d91c3428e659c36799466de7abe709c4d941 Parents: 0350423 Author: Candle <can...@candle.me.uk> Authored: Wed Dec 16 14:58:02 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 24 09:45:31 2015 +0100 ---------------------------------------------------------------------- .../component/aws/ddbstream/ShardList.java | 67 ++++++++++++++++++++ .../ShardListAfterSequenceParametrised.java | 57 +++++++++++++++++ .../ShardListAtSequenceParametrised.java | 57 +++++++++++++++++ .../component/aws/ddbstream/ShardListTest.java | 24 ++++++- 4 files changed, 204 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java index a0df179..3ae1c4e 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java @@ -21,6 +21,11 @@ import java.util.HashMap; import java.util.Map; import com.amazonaws.services.dynamodbv2.model.Shard; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +75,35 @@ class ShardList { throw new IllegalStateException("Unable to find a shard with no children " + shards); } + Shard afterSeq(String sequenceNumber) { + return atAfterSeq(sequenceNumber, After.INSTANCE); + } + + Shard atSeq(String sequenceNumber) { + return atAfterSeq(sequenceNumber, At.INSTANCE); + } + + Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) { + BigInteger atAfter = new BigInteger(sequenceNumber); + List<Shard> sorted = new ArrayList<>(); + sorted.addAll(shards.values()); + Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE); + for (Shard shard : sorted) { + if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) { + BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber()); + // essentially: after < end or after <= end + if (condition.matches(atAfter, end)) { + return shard; + } + + } + } + if (shards.size() > 0) { + return sorted.get(sorted.size()-1); + } + throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards); + } + /** * Removes shards that are older than the provided shard. * Does not remove the provided shard. @@ -95,4 +129,37 @@ class ShardList { public String toString() { return "ShardList{" + "shards=" + shards + '}'; } + + private interface AtAfterCondition { + boolean matches(BigInteger sequenceNumber, BigInteger option); + } + + private static enum After implements AtAfterCondition { + INSTANCE() { + @Override + public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) { + return providedSequenceNumber.compareTo(shardSequenceNumber) < 0; + } + } + } + + private static enum At implements AtAfterCondition { + INSTANCE() { + @Override + public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) { + return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0; + } + } + } + + private static enum StartingSequenceNumberComparator implements Comparator<Shard> { + INSTANCE() { + @Override + public int compare(Shard o1, Shard o2) { + BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber()); + BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber()); + return i1.compareTo(i2); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java new file mode 100644 index 0000000..c33ebe2 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java @@ -0,0 +1,57 @@ +package org.apache.camel.component.aws.ddbstream; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ShardListAfterSequenceParametrised { + + + @Parameterized.Parameters + public static Collection<Object[]> paramaters() { + List<Object[]> results = new ArrayList<>(); + results.add(new Object[]{"0", "a"}); + results.add(new Object[]{"3", "a"}); + results.add(new Object[]{"6", "b"}); + results.add(new Object[]{"8", "b"}); + results.add(new Object[]{"15", "c"}); + results.add(new Object[]{"16", "d"}); + results.add(new Object[]{"18", "d"}); + results.add(new Object[]{"25", "d"}); + results.add(new Object[]{"30", "d"}); + return results; + } + + private ShardList undertest; + + private final String inputSequenceNumber; + private final String expectedShardId; + + public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) { + this.inputSequenceNumber = inputSequenceNumber; + this.expectedShardId = expectedShardId; + } + + @Before + public void setup() throws Exception { + undertest = new ShardList(); + undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null, + "a", "1", "5", + "b", "8", "15", + "c", "16", "16", + "d", "20", null + )); + } + + @Test + public void assertions() throws Exception { + assertThat(undertest.afterSeq(inputSequenceNumber).getShardId(), is(expectedShardId)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java new file mode 100644 index 0000000..cf15021 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java @@ -0,0 +1,57 @@ +package org.apache.camel.component.aws.ddbstream; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ShardListAtSequenceParametrised { + + + @Parameterized.Parameters + public static Collection<Object[]> paramaters() { + List<Object[]> results = new ArrayList<>(); + results.add(new Object[]{"0", "a"}); + results.add(new Object[]{"3", "a"}); + results.add(new Object[]{"6", "b"}); + results.add(new Object[]{"8", "b"}); + results.add(new Object[]{"15", "b"}); + results.add(new Object[]{"16", "c"}); + results.add(new Object[]{"18", "d"}); + results.add(new Object[]{"25", "d"}); + results.add(new Object[]{"30", "d"}); + return results; + } + + private ShardList undertest; + + private final String inputSequenceNumber; + private final String expectedShardId; + + public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) { + this.inputSequenceNumber = inputSequenceNumber; + this.expectedShardId = expectedShardId; + } + + @Before + public void setup() throws Exception { + undertest = new ShardList(); + undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null, + "a", "1", "5", + "b", "8", "15", + "c", "16", "16", + "d", "20", null + )); + } + + @Test + public void assertions() throws Exception { + assertThat(undertest.atSeq(inputSequenceNumber).getShardId(), is(expectedShardId)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java index 1b7249a..e181d7e 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws.ddbstream; +import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange; import java.util.ArrayList; import java.util.List; import com.amazonaws.services.dynamodbv2.model.Shard; @@ -24,6 +25,7 @@ import com.amazonaws.services.dynamodbv2.model.Shard; import org.junit.Test; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import org.junit.Ignore; public class ShardListTest { @@ -118,7 +120,27 @@ public class ShardListTest { assertThat(shards.first().getShardId(), is("c")); } - List<Shard> createShards(String initialParent, String... shardIds) { + static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) { + String previous = initialParent; + List<Shard> result = new ArrayList<>(); + for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) { + String id = shardIdsAndSeqNos[i]; + String seqStart = shardIdsAndSeqNos[i+1]; + String seqEnd = shardIdsAndSeqNos[i+2]; + result.add(new Shard() + .withShardId(id) + .withParentShardId(previous) + .withSequenceNumberRange(new SequenceNumberRange() + .withStartingSequenceNumber(seqStart) + .withEndingSequenceNumber(seqEnd) + ) + ); + previous = id; + } + return result; + } + + static List<Shard> createShards(String initialParent, String... shardIds) { String previous = initialParent; List<Shard> result = new ArrayList<>(); for (String s : shardIds) {