Add filtering of the records when asking for an {at,after}_sequence_number 
iterator type.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8e05657d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8e05657d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8e05657d

Branch: refs/heads/master
Commit: 8e05657d6a742bb6b8b0026c9683953c38168a66
Parents: b2430c0
Author: Candle <can...@candle.me.uk>
Authored: Mon Dec 21 18:29:38 2015 +0000
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100

----------------------------------------------------------------------
 .../aws/ddbstream/AtAfterCondition.java         | 44 +++++++++++++
 .../aws/ddbstream/DdbStreamConsumer.java        | 45 ++++++++++++-
 .../component/aws/ddbstream/ShardList.java      | 24 +------
 .../aws/ddbstream/AtAfterConditionTest.java     | 67 ++++++++++++++++++++
 .../aws/ddbstream/DdbStreamConsumerTest.java    | 56 +++++++++++++++-
 5 files changed, 212 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
new file mode 100644
index 0000000..a6798b5
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+
+interface AtAfterCondition {
+
+    /**
+     * @return true if sequenceNumber is (at,after) the endpointSequenceNumber.
+     */
+    boolean matches(BigInteger endpointSequenceNumber, BigInteger 
sequenceNumber);
+
+    static enum Conditions implements AtAfterCondition {
+        AFTER() {
+            @Override
+            public boolean matches(BigInteger endpointSequenceNumber, 
BigInteger sequenceNumber) {
+                return endpointSequenceNumber.compareTo(sequenceNumber) < 0;
+            }
+        },
+
+        AT() {
+            @Override
+            public boolean matches(BigInteger endpointSequenceNumber, 
BigInteger sequenceNumber) {
+                return endpointSequenceNumber.compareTo(sequenceNumber) <= 0;
+            }
+        }
+        // TODO rename to LT/LTEQ/EQ/GTEQ/GT
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 25e5f31..d520abf 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.ddbstream;
 
+import java.math.BigInteger;
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
@@ -31,6 +32,7 @@ import 
com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
 import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
 import com.amazonaws.services.dynamodbv2.model.Record;
 import com.amazonaws.services.dynamodbv2.model.Shard;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -132,6 +134,30 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
                     .withStreamArn(streamArn)
                     .withShardId(currentShard.getShardId())
                     .withShardIteratorType(getEndpoint().getIteratorType());
+            switch(getEndpoint().getIteratorType()) {
+            case AFTER_SEQUENCE_NUMBER:
+            case AT_SEQUENCE_NUMBER:
+                // if you request with a sequence number that is LESS than the
+                // start of the shard, you get a HTTP 400 from AWS.
+                // So only add the sequence number if the endpoints
+                // sequence number is AT or AFTER the starting sequence for
+                // the shard.
+                // Otherwise change the shart iterator type to trim_horizon
+                // because we get a 400 when we use one of the
+                // {at,after}_sequence_number iterator types and don't supply
+                // a sequence number.
+                if (AtAfterCondition.Conditions.AT.matches(
+                        new 
BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
+                        new BigInteger(getEndpoint().getSequenceNumber())
+                )) {
+                    req = 
req.withSequenceNumber(getEndpoint().getSequenceNumber())
+                        
.withShardIteratorType(getEndpoint().getIteratorType());
+                } else {
+                    req = 
req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
+                }
+                break;
+            default:
+            }
             GetShardIteratorResult result = getClient().getShardIterator(req);
             currentShardIterator = result.getShardIterator();
         }
@@ -141,8 +167,25 @@ public class DdbStreamConsumer extends 
ScheduledBatchPollingConsumer {
 
     private Queue<Exchange> createExchanges(List<Record> records) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
+        AtAfterCondition condition;
+        BigInteger providedSeqNum = null;
+        switch(getEndpoint().getIteratorType()) {
+        case AFTER_SEQUENCE_NUMBER:
+            condition = AtAfterCondition.Conditions.AFTER;
+            providedSeqNum = new 
BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
+            break;
+        case AT_SEQUENCE_NUMBER:
+            condition = AtAfterCondition.Conditions.AT;
+            providedSeqNum = new 
BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
+            break;
+        default:
+            condition = null;
+        }
         for (Record record : records) {
-            exchanges.add(getEndpoint().createExchange(record));
+            BigInteger recordSeqNum = new 
BigInteger(record.getDynamodb().getSequenceNumber());
+            if (condition == null || condition.matches(providedSeqNum, 
recordSeqNum)) {
+                exchanges.add(getEndpoint().createExchange(record));
+            }
         }
         return exchanges;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 0a6e332..5c5bd29 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -79,11 +79,11 @@ class ShardList {
     }
 
     Shard afterSeq(String sequenceNumber) {
-        return atAfterSeq(sequenceNumber, After.INSTANCE);
+        return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AFTER);
     }
 
     Shard atSeq(String sequenceNumber) {
-        return atAfterSeq(sequenceNumber, At.INSTANCE);
+        return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AT);
     }
 
     Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
@@ -134,27 +134,9 @@ class ShardList {
         return "ShardList{" + "shards=" + shards + '}';
     }
 
-    private interface AtAfterCondition {
-        boolean matches(BigInteger sequenceNumber, BigInteger option);
-    }
 
-    private static enum After implements AtAfterCondition {
-        INSTANCE() {
-            @Override
-            public boolean matches(BigInteger providedSequenceNumber, 
BigInteger shardSequenceNumber) {
-                return providedSequenceNumber.compareTo(shardSequenceNumber) < 
0;
-            }
-        }
-    }
 
-    private static enum At implements AtAfterCondition {
-        INSTANCE() {
-            @Override
-            public boolean matches(BigInteger providedSequenceNumber, 
BigInteger shardSequenceNumber) {
-                return providedSequenceNumber.compareTo(shardSequenceNumber) 
<= 0;
-            }
-        }
-    }
+
 
     private static enum StartingSequenceNumberComparator implements 
Comparator<Shard> {
         INSTANCE() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
new file mode 100644
index 0000000..53aee40
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class AtAfterConditionTest {
+
+
+    private final AtAfterCondition condition;
+    private final int smaller;
+    private final int bigger;
+    private final boolean result;
+
+    public AtAfterConditionTest(AtAfterCondition condition, int smaller, int 
bigger, boolean result) {
+        this.condition = condition;
+        this.smaller = smaller;
+        this.bigger = bigger;
+        this.result = result;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> parameters() {
+        List<Object[]> results = new ArrayList<>();
+
+        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 5, 
true});
+        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 1, 5, 
true});
+        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 1, 
false});
+        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 1, 1, 
true});
+        results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 5, 1, 
false});
+        results.add(new Object[]{AtAfterCondition.Conditions.AT   , 5, 1, 
false});
+
+        return results;
+    }
+
+    @Test
+    public void test() throws Exception {
+        assertThat(condition.matches(BigInteger.valueOf(smaller), 
BigInteger.valueOf(bigger)), is(result));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
index 3ef6c70..0cbf9ca 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.aws.ddbstream;
 
 import java.util.ArrayList;
@@ -110,12 +126,15 @@ public class DdbStreamConsumerTest {
             @Override
             public GetRecordsResult answer(InvocationOnMock invocation) throws 
Throwable {
                 final String shardIterator = ((GetRecordsRequest) 
invocation.getArguments()[0]).getShardIterator();
-                String nextShardIterator = shardIterators.get(shardIterator); 
// note that HashMap returns null when there is no entry in the map. A null 
'nextShardIterator' indicates that the shard has finished and we should move 
onto the next shard.
+                // note that HashMap returns null when there is no entry in 
the map.
+                // A null 'nextShardIterator' indicates that the shard has 
finished
+                // and we should move onto the next shard.
+                String nextShardIterator = shardIterators.get(shardIterator);
                 Matcher m = 
Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
                 Collection<Record> ans = answers.get(shardIterator);
                 if (nextShardIterator == null && m.matches()) { // last shard 
iterates forever.
                     Integer num = Integer.parseInt(m.group(1));
-                    nextShardIterator = "shard_iterator_d_" + 
pad(Integer.toString(num+1), 3);
+                    nextShardIterator = "shard_iterator_d_" + 
pad(Integer.toString(num + 1), 3);
                 }
                 if (null == ans) { // default to an empty list of records.
                     ans = createRecords();
@@ -227,6 +246,39 @@ public class DdbStreamConsumerTest {
         assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
     }
 
+    @Test
+    public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+        endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("35"));
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        for (int i = 0; i < 10; ++i) { // poll lots.
+            undertest.poll();
+        }
+
+        ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
+        verify(processor, times(2)).process(exchangeCaptor.capture(), 
any(AsyncCallback.class));
+
+        
assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(),
 is("35"));
+        
assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(),
 is("40"));
+    }
+
+    @Test
+    public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception {
+        endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+        endpoint.setSequenceNumberProvider(new 
StaticSequenceNumberProvider("35"));
+        undertest = new DdbStreamConsumer(endpoint, processor);
+
+        for (int i = 0; i < 10; ++i) { // poll lots.
+            undertest.poll();
+        }
+
+        ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
+        verify(processor, times(1)).process(exchangeCaptor.capture(), 
any(AsyncCallback.class));
+
+        
assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(),
 is("40"));
+    }
+
     private static Collection<Record> createRecords(String... sequenceNumbers) 
{
         List<Record> results = new ArrayList<>();
 

Reply via email to