Use the shard list filtering when obtaining the first shard when using a {AFTER,AT}_SEQUENCE_NUMBER iterator type.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a25e516 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a25e516 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a25e516 Branch: refs/heads/master Commit: 0a25e51634f848d621d11a6c0abb503f4419bff2 Parents: e2b9d91 Author: Candle <can...@candle.me.uk> Authored: Mon Dec 21 13:35:59 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 24 09:45:31 2015 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamConsumer.java | 8 +- .../aws/ddbstream/DdbStreamEndpoint.java | 38 ++++++- .../aws/ddbstream/SequenceNumberProvider.java | 21 ++++ .../ddbstream/StaticSequenceNumberProvider.java | 31 ++++++ .../StringSequenceNumberConverter.java | 31 ++++++ .../services/org/apache/camel/TypeConverter | 1 + .../aws/ddbstream/DdbStreamEndpointTest.java | 104 +++++++++++++++++++ 7 files changed, 229 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java index f5223c0..25e5f31 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -108,11 +108,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { LOG.trace("Current shard is: {} (in {})", currentShard, shardList); if (currentShard == null) { switch(getEndpoint().getIteratorType()) { + case AFTER_SEQUENCE_NUMBER: + currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber()); + break; + case AT_SEQUENCE_NUMBER: + currentShard = shardList.atSeq(getEndpoint().getSequenceNumber()); + break; case TRIM_HORIZON: currentShard = shardList.first(); break; - default: case LATEST: + default: currentShard = shardList.last(); break; } http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java index 543c432..bc12bc6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java @@ -49,7 +49,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint { @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream" + " to start getting records. Note that using TRIM_HORIZON can cause a" + " significant delay before the stream has caught up to real-time." - + " Currently only LATEST and TRIM_HORIZON are supported.", + + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider" + + " MUST be supplied.", defaultValue = "LATEST") private ShardIteratorType iteratorType = ShardIteratorType.LATEST; // TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER @@ -61,6 +62,11 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint { // Note that the shard list needs to have the ability to start at the shard // that includes the supplied sequence number + @UriParam(label = "consumer", description = "Provider for the sequence number when" + + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER" + + " iterator types. Can be a registry reference or a literal sequence number.") + private SequenceNumberProvider sequenceNumberProvider; + public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) { super(uri, component); this.tableName = tableName; @@ -90,13 +96,30 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint { return true; } + public String getSequenceNumber() { + switch (getIteratorType()) { + case AFTER_SEQUENCE_NUMBER: + case AT_SEQUENCE_NUMBER: + if (null == getSequenceNumberProvider()) { + throw new IllegalStateException("sequenceNumberProvider must be" + + " provided, either as an implementation of" + + " SequenceNumberProvider or a literal String."); + } else { + return getSequenceNumberProvider().getSequenceNumber(); + } + default: + return ""; + } + } + @Override public String toString() { return "DdbStreamEndpoint{" + "tableName=" + tableName + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest - + ", iteratorType=" - + iteratorType + ", uri=" + getEndpointUri() + + ", iteratorType=" + iteratorType + + ", sequenceNumberProvider=" + sequenceNumberProvider + + ", uri=" + getEndpointUri() + '}'; } @@ -135,5 +158,12 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint { public void setIteratorType(ShardIteratorType iteratorType) { this.iteratorType = iteratorType; } - + + public SequenceNumberProvider getSequenceNumberProvider() { + return sequenceNumberProvider; + } + + public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) { + this.sequenceNumberProvider = sequenceNumberProvider; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java new file mode 100644 index 0000000..5a9dd8c --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +public interface SequenceNumberProvider { + String getSequenceNumber(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java new file mode 100644 index 0000000..459767b --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +public class StaticSequenceNumberProvider implements SequenceNumberProvider { + + private final String sequenceNumber; + + public StaticSequenceNumberProvider(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public String getSequenceNumber() { + return sequenceNumber; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java new file mode 100644 index 0000000..92bae2b --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import org.apache.camel.Converter; + +@Converter +public class StringSequenceNumberConverter { + + private StringSequenceNumberConverter() { + } + + @Converter + public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) { + return new StaticSequenceNumberProvider(sequenceNumber); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter index 4873a46..4472b59 100644 --- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter @@ -16,3 +16,4 @@ ## --------------------------------------------------------------------------- org.apache.camel.component.aws.kinesis.RecordStringConverter +org.apache.camel.component.aws.ddbstream.StringSequenceNumberConverter http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java new file mode 100644 index 0000000..9e688be --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Test; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DdbStreamEndpointTest { + + private CamelContext context; + + @Mock private SequenceNumberProvider sequenceNumberProvider; + @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams; + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + registry.put("someSeqNumProv", sequenceNumberProvider); + registry.put("ddbStreamsClient", amazonDynamoDBStreams); + + context = new DefaultCamelContext(registry); + } + + @Test + public void itExtractsTheSequenceNumber() throws Exception { + when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq"); + + DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table" + + "?amazonDynamoDbStreamsClient=#ddbStreamsClient" + + "&iteratorType=AFTER_SEQUENCE_NUMBER" + + "&sequenceNumberProvider=#someSeqNumProv"); + + assertThat(undertest.getSequenceNumber(), is("seq")); + } + + @Test + public void itExtractsTheSequenceNumberFromALiteralString() throws Exception { + + DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table" + + "?amazonDynamoDbStreamsClient=#ddbStreamsClient" + + "&iteratorType=AFTER_SEQUENCE_NUMBER" + + "&sequenceNumberProvider=seq"); + + assertThat(undertest.getSequenceNumber(), is("seq")); + } + + @Test + public void onSequenceNumberAgnosticIteratorsTheProviderIsIgnored() throws Exception { + when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq"); + + DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table" + + "?amazonDynamoDbStreamsClient=#ddbStreamsClient" + + "&iteratorType=LATEST" + + "&sequenceNumberProvider=#someSeqNumProv"); + + assertThat(undertest.getSequenceNumber(), is("")); + verify(sequenceNumberProvider, never()).getSequenceNumber(); + } + + @Test + public void sequenceNumberFetchingThrowsSomethingUsefulIfMisconfigurered() throws Exception { + when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq"); + + expectedException.expectMessage(containsString("sequenceNumberProvider")); + + DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table" + + "?amazonDynamoDbStreamsClient=#ddbStreamsClient" + + "&iteratorType=AT_SEQUENCE_NUMBER"); // NOTE: missing sequence number provider parameter + + undertest.getSequenceNumber(); + } +} \ No newline at end of file