This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 49039bb5afc12a083311b5bac56018b2b5d7be84 Author: Pierre-Yves B <pierre-yves.bigour...@bbc.co.uk> AuthorDate: Wed Sep 22 14:32:52 2021 +0100 CAMEL-16594 Support more than one active shard in DynamoDB stream. --- .../ddbstream/Ddb2StreamComponentConfigurer.java | 18 +-- .../ddbstream/Ddb2StreamEndpointConfigurer.java | 18 +-- .../ddbstream/Ddb2StreamEndpointUriFactory.java | 5 +- .../StringSequenceNumberConverterLoader.java | 56 ------- .../services/org/apache/camel/TypeConverterLoader | 2 - .../component/aws2/ddbstream/aws2-ddbstream.json | 6 +- .../aws2/ddbstream/Ddb2StreamConfiguration.java | 38 ++--- .../aws2/ddbstream/Ddb2StreamConsumer.java | 95 +++++------- .../aws2/ddbstream/Ddb2StreamEndpoint.java | 21 +-- .../aws2/ddbstream/SequenceNumberProvider.java | 21 --- .../aws2/ddbstream/ShardIteratorHandler.java | 166 ++++++++++----------- .../camel/component/aws2/ddbstream/ShardTree.java | 68 +++++++++ .../ddbstream/StringSequenceNumberConverter.java | 32 ---- .../ddbstream/AmazonDDBStreamlessClientMock.java} | 20 ++- .../aws2/ddbstream/AmazonDDBStreamsClientMock.java | 73 +++++++++ .../component/aws2/ddbstream/ShardFixtures.java | 74 +++++++++ .../aws2/ddbstream/ShardIteratorHandlerTest.java | 161 ++++++++++++++++++++ .../component/aws2/ddbstream/ShardTreeTest.java | 75 ++++++++++ .../dsl/Aws2DdbstreamComponentBuilderFactory.java | 40 ++--- .../dsl/Ddb2StreamEndpointBuilderFactory.java | 94 ++++-------- 20 files changed, 654 insertions(+), 429 deletions(-) diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java index 28e2949..35ff908 100644 --- a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java +++ b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java @@ -37,8 +37,6 @@ public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport imp case "bridgeerrorhandler": case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true; case "configuration": target.setConfiguration(property(camelContext, org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.class, value)); return true; - case "iteratortype": - case "iteratorType": getOrCreateConfiguration(target).setIteratorType(property(camelContext, software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class, value)); return true; case "maxresultsperrequest": case "maxResultsPerRequest": getOrCreateConfiguration(target).setMaxResultsPerRequest(property(camelContext, int.class, value)); return true; case "overrideendpoint": @@ -52,8 +50,8 @@ public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport imp case "region": getOrCreateConfiguration(target).setRegion(property(camelContext, java.lang.String.class, value)); return true; case "secretkey": case "secretKey": getOrCreateConfiguration(target).setSecretKey(property(camelContext, java.lang.String.class, value)); return true; - case "sequencenumberprovider": - case "sequenceNumberProvider": getOrCreateConfiguration(target).setSequenceNumberProvider(property(camelContext, org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, value)); return true; + case "streamiteratortype": + case "streamIteratorType": getOrCreateConfiguration(target).setStreamIteratorType(property(camelContext, org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class, value)); return true; case "trustallcertificates": case "trustAllCertificates": getOrCreateConfiguration(target).setTrustAllCertificates(property(camelContext, boolean.class, value)); return true; case "uriendpointoverride": @@ -81,8 +79,6 @@ public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport imp case "bridgeerrorhandler": case "bridgeErrorHandler": return boolean.class; case "configuration": return org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.class; - case "iteratortype": - case "iteratorType": return software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class; case "maxresultsperrequest": case "maxResultsPerRequest": return int.class; case "overrideendpoint": @@ -96,8 +92,8 @@ public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport imp case "region": return java.lang.String.class; case "secretkey": case "secretKey": return java.lang.String.class; - case "sequencenumberprovider": - case "sequenceNumberProvider": return org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class; + case "streamiteratortype": + case "streamIteratorType": return org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class; case "trustallcertificates": case "trustAllCertificates": return boolean.class; case "uriendpointoverride": @@ -121,8 +117,6 @@ public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport imp case "bridgeerrorhandler": case "bridgeErrorHandler": return target.isBridgeErrorHandler(); case "configuration": return target.getConfiguration(); - case "iteratortype": - case "iteratorType": return getOrCreateConfiguration(target).getIteratorType(); case "maxresultsperrequest": case "maxResultsPerRequest": return getOrCreateConfiguration(target).getMaxResultsPerRequest(); case "overrideendpoint": @@ -136,8 +130,8 @@ public class Ddb2StreamComponentConfigurer extends PropertyConfigurerSupport imp case "region": return getOrCreateConfiguration(target).getRegion(); case "secretkey": case "secretKey": return getOrCreateConfiguration(target).getSecretKey(); - case "sequencenumberprovider": - case "sequenceNumberProvider": return getOrCreateConfiguration(target).getSequenceNumberProvider(); + case "streamiteratortype": + case "streamIteratorType": return getOrCreateConfiguration(target).getStreamIteratorType(); case "trustallcertificates": case "trustAllCertificates": return getOrCreateConfiguration(target).isTrustAllCertificates(); case "uriendpointoverride": diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java index 4c4af9b..4fde0a8 100644 --- a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java +++ b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java @@ -41,8 +41,6 @@ public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport impl case "greedy": target.setGreedy(property(camelContext, boolean.class, value)); return true; case "initialdelay": case "initialDelay": target.setInitialDelay(property(camelContext, long.class, value)); return true; - case "iteratortype": - case "iteratorType": target.getConfiguration().setIteratorType(property(camelContext, software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class, value)); return true; case "maxresultsperrequest": case "maxResultsPerRequest": target.getConfiguration().setMaxResultsPerRequest(property(camelContext, int.class, value)); return true; case "overrideendpoint": @@ -69,10 +67,10 @@ public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport impl case "secretKey": target.getConfiguration().setSecretKey(property(camelContext, java.lang.String.class, value)); return true; case "sendemptymessagewhenidle": case "sendEmptyMessageWhenIdle": target.setSendEmptyMessageWhenIdle(property(camelContext, boolean.class, value)); return true; - case "sequencenumberprovider": - case "sequenceNumberProvider": target.getConfiguration().setSequenceNumberProvider(property(camelContext, org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, value)); return true; case "startscheduler": case "startScheduler": target.setStartScheduler(property(camelContext, boolean.class, value)); return true; + case "streamiteratortype": + case "streamIteratorType": target.getConfiguration().setStreamIteratorType(property(camelContext, org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class, value)); return true; case "timeunit": case "timeUnit": target.setTimeUnit(property(camelContext, java.util.concurrent.TimeUnit.class, value)); return true; case "trustallcertificates": @@ -115,8 +113,6 @@ public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport impl case "greedy": return boolean.class; case "initialdelay": case "initialDelay": return long.class; - case "iteratortype": - case "iteratorType": return software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class; case "maxresultsperrequest": case "maxResultsPerRequest": return int.class; case "overrideendpoint": @@ -143,10 +139,10 @@ public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport impl case "secretKey": return java.lang.String.class; case "sendemptymessagewhenidle": case "sendEmptyMessageWhenIdle": return boolean.class; - case "sequencenumberprovider": - case "sequenceNumberProvider": return org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class; case "startscheduler": case "startScheduler": return boolean.class; + case "streamiteratortype": + case "streamIteratorType": return org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class; case "timeunit": case "timeUnit": return java.util.concurrent.TimeUnit.class; case "trustallcertificates": @@ -185,8 +181,6 @@ public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport impl case "greedy": return target.isGreedy(); case "initialdelay": case "initialDelay": return target.getInitialDelay(); - case "iteratortype": - case "iteratorType": return target.getConfiguration().getIteratorType(); case "maxresultsperrequest": case "maxResultsPerRequest": return target.getConfiguration().getMaxResultsPerRequest(); case "overrideendpoint": @@ -213,10 +207,10 @@ public class Ddb2StreamEndpointConfigurer extends PropertyConfigurerSupport impl case "secretKey": return target.getConfiguration().getSecretKey(); case "sendemptymessagewhenidle": case "sendEmptyMessageWhenIdle": return target.isSendEmptyMessageWhenIdle(); - case "sequencenumberprovider": - case "sequenceNumberProvider": return target.getConfiguration().getSequenceNumberProvider(); case "startscheduler": case "startScheduler": return target.isStartScheduler(); + case "streamiteratortype": + case "streamIteratorType": return target.getConfiguration().getStreamIteratorType(); case "timeunit": case "timeUnit": return target.getTimeUnit(); case "trustallcertificates": diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java index 9a55969..3e374ab 100644 --- a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java +++ b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java @@ -20,7 +20,7 @@ public class Ddb2StreamEndpointUriFactory extends org.apache.camel.support.compo private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(34); + Set<String> props = new HashSet<>(33); props.add("backoffMultiplier"); props.add("initialDelay"); props.add("amazonDynamoDbStreamsClient"); @@ -37,14 +37,12 @@ public class Ddb2StreamEndpointUriFactory extends org.apache.camel.support.compo props.add("timeUnit"); props.add("proxyProtocol"); props.add("secretKey"); - props.add("iteratorType"); props.add("sendEmptyMessageWhenIdle"); props.add("schedulerProperties"); props.add("uriEndpointOverride"); props.add("exchangePattern"); props.add("useDefaultCredentialsProvider"); props.add("proxyHost"); - props.add("sequenceNumberProvider"); props.add("backoffIdleThreshold"); props.add("trustAllCertificates"); props.add("delay"); @@ -53,6 +51,7 @@ public class Ddb2StreamEndpointUriFactory extends org.apache.camel.support.compo props.add("accessKey"); props.add("overrideEndpoint"); props.add("maxResultsPerRequest"); + props.add("streamIteratorType"); props.add("region"); props.add("exceptionHandler"); PROPERTY_NAMES = Collections.unmodifiableSet(props); diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java deleted file mode 100644 index 3ab0a19..0000000 --- a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java +++ /dev/null @@ -1,56 +0,0 @@ -/* Generated by camel build tools - do NOT edit this file! */ -package org.apache.camel.component.aws2.ddbstream; - -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.DeferredContextBinding; -import org.apache.camel.Exchange; -import org.apache.camel.TypeConversionException; -import org.apache.camel.TypeConverterLoaderException; -import org.apache.camel.spi.TypeConverterLoader; -import org.apache.camel.spi.TypeConverterRegistry; -import org.apache.camel.support.SimpleTypeConverter; -import org.apache.camel.support.TypeConverterSupport; -import org.apache.camel.util.DoubleMap; - -/** - * Generated by camel build tools - do NOT edit this file! - */ -@SuppressWarnings("unchecked") -@DeferredContextBinding -public final class StringSequenceNumberConverterLoader implements TypeConverterLoader, CamelContextAware { - - private CamelContext camelContext; - - public StringSequenceNumberConverterLoader() { - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - public void load(TypeConverterRegistry registry) throws TypeConverterLoaderException { - try { - registerConverters(registry); - } catch (Throwable e) { - // ignore on load error - } - } - - private void registerConverters(TypeConverterRegistry registry) { - addTypeConverter(registry, org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, java.lang.String.class, false, - (type, exchange, value) -> org.apache.camel.component.aws2.ddbstream.StringSequenceNumberConverter.toSequenceNumberProvider((java.lang.String) value)); - } - - private static void addTypeConverter(TypeConverterRegistry registry, Class<?> toType, Class<?> fromType, boolean allowNull, SimpleTypeConverter.ConversionMethod method) { - registry.addTypeConverter(toType, fromType, new SimpleTypeConverter(allowNull, method)); - } - -} diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader deleted file mode 100644 index d5cbd3a..0000000 --- a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader +++ /dev/null @@ -1,2 +0,0 @@ -# Generated by camel build tools - do NOT edit this file! -org.apache.camel.component.aws2.ddbstream.StringSequenceNumberConverterLoader diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json b/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json index ce2fbf3..533bd93 100644 --- a/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json +++ b/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json @@ -25,14 +25,13 @@ "amazonDynamoDbStreamsClient": { "kind": "property", "displayName": "Amazon Dynamo Db Streams Client", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "descripti [...] "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "configuration": { "kind": "property", "displayName": "Configuration", "group": "consumer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "deprecated": false, "autowired": false, "secret": false, "description": "The component configuration" }, - "iteratorType": { "kind": "property", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.model.ShardIteratorType", "enum": [ "TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfigu [...] "maxResultsPerRequest": { "kind": "property", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, "overrideEndpoint": { "kind": "property", "displayName": "Override Endpoint", "group": "consumer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Set the need for overidding the endpoint. This option needs to be used in combination wi [...] "proxyHost": { "kind": "property", "displayName": "Proxy Host", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy host when instantiating the DDBStreams client" }, "proxyPort": { "kind": "property", "displayName": "Proxy Port", "group": "consumer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy port when instantiating the DDBStreams client" }, "proxyProtocol": { "kind": "property", "displayName": "Proxy Protocol", "group": "consumer", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy protocol when ins [...] "region": { "kind": "property", "displayName": "Region", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "The region in which DDBStreams client needs to work" }, - "sequenceNumberProvider": { "kind": "property", "displayName": "Sequence Number Provider", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Provider for the sequence number w [...] + "streamIteratorType": { "kind": "property", "displayName": "Stream Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType", "enum": [ "FROM_LATEST", "FROM_START" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "FROM_LATEST", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "conf [...] "trustAllCertificates": { "kind": "property", "displayName": "Trust All Certificates", "group": "consumer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "If we want to trust all certificates in case of overriding the endpoint" }, "uriEndpointOverride": { "kind": "property", "displayName": "Uri Endpoint Override", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoi [...] "useDefaultCredentialsProvider": { "kind": "property", "displayName": "Use Default Credentials Provider", "group": "consumer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Set whether the DynamoDB Streams client should expect to loa [...] @@ -44,7 +43,6 @@ "tableName": { "kind": "path", "displayName": "Table Name", "group": "consumer", "label": "consumer", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Name of the dynamodb table" }, "amazonDynamoDbStreamsClient": { "kind": "parameter", "displayName": "Amazon Dynamo Db Streams Client", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "descript [...] "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] - "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.dynamodb.model.ShardIteratorType", "enum": [ "TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfig [...] "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, "overrideEndpoint": { "kind": "parameter", "displayName": "Override Endpoint", "group": "consumer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Set the need for overidding the endpoint. This option needs to be used in combination w [...] "proxyHost": { "kind": "parameter", "displayName": "Proxy Host", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy host when instantiating the DDBStreams client" }, @@ -52,7 +50,7 @@ "proxyProtocol": { "kind": "parameter", "displayName": "Proxy Protocol", "group": "consumer", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "To define a proxy protocol when in [...] "region": { "kind": "parameter", "displayName": "Region", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "The region in which DDBStreams client needs to work" }, "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send Empty Message When Idle", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead." }, - "sequenceNumberProvider": { "kind": "parameter", "displayName": "Sequence Number Provider", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Provider for the sequence number [...] + "streamIteratorType": { "kind": "parameter", "displayName": "Stream Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType", "enum": [ "FROM_LATEST", "FROM_START" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "FROM_LATEST", "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "con [...] "trustAllCertificates": { "kind": "parameter", "displayName": "Trust All Certificates", "group": "consumer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "If we want to trust all certificates in case of overriding the endpoint" }, "uriEndpointOverride": { "kind": "parameter", "displayName": "Uri Endpoint Override", "group": "consumer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpo [...] "useDefaultCredentialsProvider": { "kind": "parameter", "displayName": "Use Default Credentials Provider", "group": "consumer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "configurationField": "configuration", "description": "Set whether the DynamoDB Streams client should expect to lo [...] diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java index 06e7933..9a78be3 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java @@ -22,7 +22,6 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; import software.amazon.awssdk.core.Protocol; -import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; @UriParams @@ -47,18 +46,12 @@ public class Ddb2StreamConfiguration implements Cloneable { private int maxResultsPerRequest = 100; @UriParam(label = "consumer", - description = "Defines where in the DynaboDB stream" - + " to start getting records. Note that using TRIM_HORIZON can cause a" - + " significant delay before the stream has caught up to real-time." - + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider" + " MUST be supplied.", - defaultValue = "LATEST") - private ShardIteratorType iteratorType = ShardIteratorType.LATEST; + description = "Defines where in the DynamoDB stream" + + " to start getting records. Note that using FROM_START can cause a" + + " significant delay before the stream has caught up to real-time.", + defaultValue = "FROM_LATEST") + private StreamIteratorType streamIteratorType = StreamIteratorType.FROM_LATEST; - @UriParam(label = "consumer", - description = "Provider for the sequence number when" - + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER" - + " iterator types. Can be a registry reference or a literal sequence number.") - private SequenceNumberProvider sequenceNumberProvider; @UriParam(enums = "HTTP,HTTPS", defaultValue = "HTTPS", description = "To define a proxy protocol when instantiating the DDBStreams client") private Protocol proxyProtocol = Protocol.HTTPS; @@ -127,20 +120,12 @@ public class Ddb2StreamConfiguration implements Cloneable { this.tableName = tableName; } - public ShardIteratorType getIteratorType() { - return iteratorType; - } - - public void setIteratorType(ShardIteratorType iteratorType) { - this.iteratorType = iteratorType; + public StreamIteratorType getStreamIteratorType() { + return streamIteratorType; } - public SequenceNumberProvider getSequenceNumberProvider() { - return sequenceNumberProvider; - } - - public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) { - this.sequenceNumberProvider = sequenceNumberProvider; + public void setStreamIteratorType(StreamIteratorType streamIteratorType) { + this.streamIteratorType = streamIteratorType; } public Protocol getProxyProtocol() { @@ -210,4 +195,9 @@ public class Ddb2StreamConfiguration implements Cloneable { throw new RuntimeCamelException(e); } } + + public enum StreamIteratorType { + FROM_LATEST, + FROM_START + } } diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java index b952176..25044fc 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java @@ -16,9 +16,11 @@ */ package org.apache.camel.component.aws2.ddbstream; -import java.math.BigInteger; import java.util.ArrayDeque; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import org.apache.camel.AsyncCallback; @@ -33,14 +35,13 @@ import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.Record; -import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(Ddb2StreamConsumer.class); private final ShardIteratorHandler shardIteratorHandler; - private String lastSeenSequenceNumber; + private final Map<String, String> lastSeenSequenceNumbers = new HashMap<>(); public Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor) { this(endpoint, processor, new ShardIteratorHandler(endpoint)); @@ -53,29 +54,41 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { @Override protected int poll() throws Exception { - GetRecordsResponse result; - try { - GetRecordsRequest.Builder req - = GetRecordsRequest.builder().shardIterator(shardIteratorHandler.getShardIterator(null)) - .limit(getEndpoint().getConfiguration().getMaxResultsPerRequest()); - result = getClient().getRecords(req.build()); - } catch (ExpiredIteratorException e) { - LOG.warn("Expired Shard Iterator, attempting to resume from {}", lastSeenSequenceNumber, e); - GetRecordsRequest.Builder req - = GetRecordsRequest.builder().shardIterator(shardIteratorHandler.getShardIterator(lastSeenSequenceNumber)) - .limit(getEndpoint().getConfiguration().getMaxResultsPerRequest()); - result = getClient().getRecords(req.build()); - } - List<Record> records = result.records(); - - Queue<Exchange> exchanges = createExchanges(records, lastSeenSequenceNumber); - int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); + int processedExchangeCount = 0; + Map<String, String> shardIterators = shardIteratorHandler.getShardIterators(); + for (Entry<String, String> shardIteratorEntry : shardIterators.entrySet()) { + int limitPerRecordsRequest = Math.max(1, + getEndpoint().getConfiguration().getMaxResultsPerRequest() / shardIterators.size()); + String shardId = shardIteratorEntry.getKey(); + String shardIterator = shardIteratorEntry.getValue(); + GetRecordsResponse result; + try { + GetRecordsRequest req = GetRecordsRequest.builder() + .shardIterator(shardIterator) + .limit(limitPerRecordsRequest) + .build(); + result = getEndpoint().getClient().getRecords(req); + } catch (ExpiredIteratorException e) { + String lastSeenSequenceNumber = lastSeenSequenceNumbers.get(shardId); + LOG.warn("Expired Shard Iterator, attempting to resume from {}", lastSeenSequenceNumber, e); + GetRecordsRequest req = GetRecordsRequest.builder() + .shardIterator(shardIteratorHandler.requestFreshShardIterator(shardId, lastSeenSequenceNumber)) + .limit(limitPerRecordsRequest) + .build(); + result = getEndpoint().getClient().getRecords(req); + } + List<Record> records = result.records(); + Queue<Exchange> exchanges = new ArrayDeque<>(); + for (Record record : records) { + exchanges.add(createExchange(record)); + } + processedExchangeCount += processBatch(CastUtils.cast(exchanges)); - shardIteratorHandler.updateShardIterator(result.nextShardIterator()); - if (!records.isEmpty()) { - lastSeenSequenceNumber = records.get(records.size() - 1).dynamodb().sequenceNumber(); + shardIteratorHandler.updateShardIterator(shardId, result.nextShardIterator()); + if (!records.isEmpty()) { + lastSeenSequenceNumbers.put(shardId, records.get(records.size() - 1).dynamodb().sequenceNumber()); + } } - return processedExchangeCount; } @@ -99,42 +112,8 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { return ex; } - private DynamoDbStreamsClient getClient() { - return getEndpoint().getClient(); - } - @Override public Ddb2StreamEndpoint getEndpoint() { return (Ddb2StreamEndpoint) super.getEndpoint(); } - - private Queue<Exchange> createExchanges(List<Record> records, String lastSeenSequenceNumber) { - Queue<Exchange> exchanges = new ArrayDeque<>(); - BigIntComparisons condition = null; - BigInteger providedSeqNum = null; - if (lastSeenSequenceNumber != null) { - providedSeqNum = new BigInteger(lastSeenSequenceNumber); - condition = BigIntComparisons.Conditions.LT; - } - switch (getEndpoint().getConfiguration().getIteratorType()) { - case AFTER_SEQUENCE_NUMBER: - condition = BigIntComparisons.Conditions.LT; - providedSeqNum - = new BigInteger(getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber()); - break; - case AT_SEQUENCE_NUMBER: - condition = BigIntComparisons.Conditions.LTEQ; - providedSeqNum - = new BigInteger(getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber()); - break; - default: - } - for (Record record : records) { - BigInteger recordSeqNum = new BigInteger(record.dynamodb().sequenceNumber()); - if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) { - exchanges.add(createExchange(record)); - } - } - return exchanges; - } } diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java index 5dad187..1259207 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java @@ -95,22 +95,6 @@ public class Ddb2StreamEndpoint extends ScheduledPollEndpoint { return ddbStreamClient; } - public String getSequenceNumber() { - switch (configuration.getIteratorType()) { - case AFTER_SEQUENCE_NUMBER: - case AT_SEQUENCE_NUMBER: - if (null == configuration.getSequenceNumberProvider()) { - throw new IllegalStateException( - "sequenceNumberProvider must be" + " provided, either as an implementation of" - + " SequenceNumberProvider or a literal String."); - } else { - return configuration.getSequenceNumberProvider().getSequenceNumber(); - } - default: - return ""; - } - } - DynamoDbStreamsClient createDdbStreamClient() { DynamoDbStreamsClient client = null; DynamoDbStreamsClientBuilder clientBuilder = DynamoDbStreamsClient.builder(); @@ -161,8 +145,7 @@ public class Ddb2StreamEndpoint extends ScheduledPollEndpoint { public String toString() { return "DdbStreamEndpoint{" + "tableName=" + configuration.getTableName() + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" - + configuration.getMaxResultsPerRequest() + ", iteratorType=" + configuration.getIteratorType() - + ", sequenceNumberProvider=" - + configuration.getSequenceNumberProvider() + ", uri=" + getEndpointUri() + '}'; + + configuration.getMaxResultsPerRequest() + ", streamIteratorType=" + configuration.getStreamIteratorType() + + ", uri=" + getEndpointUri() + '}'; } } diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java deleted file mode 100644 index b25fd86..0000000 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.ddbstream; - -public interface SequenceNumberProvider { - String getSequenceNumber(); -} diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java index b3e5940..685ea53 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java @@ -16,14 +16,17 @@ */ package org.apache.camel.component.aws2.ddbstream; -import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest; import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse; import software.amazon.awssdk.services.dynamodb.model.Shard; @@ -34,111 +37,108 @@ class ShardIteratorHandler { private static final Logger LOG = LoggerFactory.getLogger(ShardIteratorHandler.class); private final Ddb2StreamEndpoint endpoint; - private final ShardList shardList = new ShardList(); + private final ShardTree shardTree = new ShardTree(); - private String currentShardIterator; - private Shard currentShard; + private String streamArn; + private Map<String, String> currentShardIterators = new HashMap<>(); ShardIteratorHandler(Ddb2StreamEndpoint endpoint) { this.endpoint = endpoint; } - String getShardIterator(String resumeFromSequenceNumber) { - ShardIteratorType iteratorType = getEndpoint().getConfiguration().getIteratorType(); - String sequenceNumber = getEndpoint().getSequenceNumber(); - if (resumeFromSequenceNumber != null) { - // Reset things as we're in an error condition. - currentShard = null; - currentShardIterator = null; - iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; - sequenceNumber = resumeFromSequenceNumber; + Map<String, String> getShardIterators() { + if (streamArn == null) { + streamArn = getStreamArn(); } - // either return a cached one or get a new one via a GetShardIterator - // request. - if (currentShardIterator == null) { - ListStreamsResponse streamsListResult = getClient().listStreams( - ListStreamsRequest.builder().tableName(getEndpoint().getConfiguration().getTableName()).build()); - if (streamsListResult.streams().isEmpty()) { - throw new IllegalArgumentException( - "There is no stream associated with table configured. Please create one."); - } - final String streamArn = streamsListResult.streams().get(0).streamArn(); // XXX - // assumes - // there - // is - // only - // one - // stream + // Either return cached ones or get new ones via GetShardIterator requests. + if (currentShardIterators.isEmpty()) { DescribeStreamResponse streamDescriptionResult = getClient().describeStream(DescribeStreamRequest.builder().streamArn(streamArn).build()); - shardList.addAll(streamDescriptionResult.streamDescription().shards()); + shardTree.populate(streamDescriptionResult.streamDescription().shards()); - LOG.trace("Current shard is: {} (in {})", currentShard, shardList); - if (currentShard == null) { - currentShard = resolveNewShard(iteratorType, resumeFromSequenceNumber); - } else { - currentShard = shardList.nextAfter(currentShard); + StreamIteratorType streamIteratorType = getEndpoint().getConfiguration().getStreamIteratorType(); + currentShardIterators = getCurrentShardIterators(streamIteratorType); + } else { + Map<String, String> childShardIterators = new HashMap<>(); + for (Entry<String, String> currentShardIterator : currentShardIterators.entrySet()) { + List<Shard> children = shardTree.getChildren(currentShardIterator.getKey()); + if (children.isEmpty()) { // This is still an active leaf shard, reuse it. + childShardIterators.put(currentShardIterator.getKey(), currentShardIterator.getValue()); + } else { + for (Shard child : children) { // Inactive shard, move down to its children. + String shardIterator = getShardIterator(child.shardId(), ShardIteratorType.TRIM_HORIZON); + childShardIterators.put(child.shardId(), shardIterator); + } + } } - shardList.removeOlderThan(currentShard); - LOG.trace("Next shard is: {} (in {})", currentShard, shardList); + currentShardIterators = childShardIterators; + } + LOG.trace("Shard Iterators are: {}", currentShardIterators); + return currentShardIterators; + } - GetShardIteratorResponse result - = getClient().getShardIterator(buildGetShardIteratorRequest(streamArn, iteratorType, sequenceNumber)); - currentShardIterator = result.shardIterator(); + void updateShardIterator(String shardId, String nextShardIterator) { + if (nextShardIterator == null) { // Shard has become inactive and all records have been consumed. + currentShardIterators.remove(shardId); + } else { + currentShardIterators.put(shardId, nextShardIterator); } - LOG.trace("Shard Iterator is: {}", currentShardIterator); - return currentShardIterator; } - private GetShardIteratorRequest buildGetShardIteratorRequest( - final String streamArn, ShardIteratorType iteratorType, String sequenceNumber) { - GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder().streamArn(streamArn) - .shardId(currentShard.shardId()).shardIteratorType(iteratorType); - switch (iteratorType) { - case AFTER_SEQUENCE_NUMBER: - case AT_SEQUENCE_NUMBER: - // if you request with a sequence number that is LESS than the - // start of the shard, you get a HTTP 400 from AWS. - // So only add the sequence number if the endpoints - // sequence number is less than or equal to the starting - // sequence for the shard. - // Otherwise change the shart iterator type to trim_horizon - // because we get a 400 when we use one of the - // {at,after}_sequence_number iterator types and don't supply - // a sequence number. - if (BigIntComparisons.Conditions.LTEQ.matches( - new BigInteger(currentShard.sequenceNumberRange().startingSequenceNumber()), - new BigInteger(sequenceNumber))) { - req.sequenceNumber(sequenceNumber); - } else { - req.shardIteratorType(ShardIteratorType.TRIM_HORIZON); - } - break; - default: + String requestFreshShardIterator(String shardId, String lastSeenSequenceNumber) { + String shardIterator = getShardIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeenSequenceNumber); + currentShardIterators.put(shardId, shardIterator); + return shardIterator; + } + + Ddb2StreamEndpoint getEndpoint() { + return endpoint; + } + + private String getStreamArn() { + ListStreamsResponse streamsListResult = getClient().listStreams( + ListStreamsRequest.builder().tableName(getEndpoint().getConfiguration().getTableName()).build()); + if (streamsListResult.streams().isEmpty()) { + throw new IllegalArgumentException( + "There is no stream associated with table configured. Please create one."); } - return req.build(); + return streamsListResult.streams().get(0).streamArn(); // XXX assumes there is only one stream } - private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) { - switch (type) { - case AFTER_SEQUENCE_NUMBER: - return shardList.afterSeq(resumeFrom != null ? resumeFrom : getEndpoint().getSequenceNumber()); - case AT_SEQUENCE_NUMBER: - return shardList.atSeq(getEndpoint().getSequenceNumber()); - case TRIM_HORIZON: - return shardList.first(); - case LATEST: + private Map<String, String> getCurrentShardIterators(StreamIteratorType streamIteratorType) { + List<Shard> currentShards; + ShardIteratorType shardIteratorType; + switch (streamIteratorType) { + case FROM_START: + currentShards = shardTree.getRoots(); + shardIteratorType = ShardIteratorType.TRIM_HORIZON; + break; + case FROM_LATEST: default: - return shardList.last(); + currentShards = shardTree.getLeaves(); + shardIteratorType = ShardIteratorType.LATEST; + } + + Map<String, String> shardIterators = new HashMap<>(); + for (Shard currentShard : currentShards) { + String shardIterator = getShardIterator(currentShard.shardId(), shardIteratorType); + shardIterators.put(currentShard.shardId(), shardIterator); } + return shardIterators; } - void updateShardIterator(String nextShardIterator) { - this.currentShardIterator = nextShardIterator; + private String getShardIterator(String shardId, ShardIteratorType shardIteratorType) { + return getShardIterator(shardId, shardIteratorType, null); } - Ddb2StreamEndpoint getEndpoint() { - return endpoint; + private String getShardIterator(String shardId, ShardIteratorType shardIteratorType, String lastSeenSequenceNumber) { + GetShardIteratorRequest request = GetShardIteratorRequest.builder() + .streamArn(streamArn) + .shardId(shardId) + .shardIteratorType(shardIteratorType) + .sequenceNumber(lastSeenSequenceNumber) + .build(); + return getClient().getShardIterator(request).shardIterator(); } private DynamoDbStreamsClient getClient() { diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardTree.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardTree.java new file mode 100644 index 0000000..97821cc --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardTree.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.ddbstream; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +class ShardTree { + + private final Map<String, Shard> shards = new HashMap<>(); + + void populate(Collection<Shard> shards) { + this.shards.clear(); + for (Shard shard : shards) { + this.shards.put(shard.shardId(), shard); + } + } + + List<Shard> getRoots() { + List<Shard> roots = shards.values() + .stream() + .filter(s -> !shards.containsKey(s.parentShardId())) + .collect(Collectors.toList()); + if (roots.isEmpty()) { + throw new IllegalStateException("Unable to find an unparented shard in " + shards); + } + return roots; + } + + List<Shard> getLeaves() { + return shards.values() + .stream() + .filter(s -> s.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + } + + List<Shard> getChildren(String shardId) { + return shards.values() + .stream() + .filter(s -> shardId.equals(s.parentShardId())) + .collect(Collectors.toList()); + } + + @Override + public String toString() { + return "ShardList{" + "shards=" + shards + '}'; + } + +} diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java deleted file mode 100644 index 0178d1a..0000000 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.ddbstream; - -import org.apache.camel.Converter; - -// Allow to ignore this type converter if the ddbstream JARs are not present on the classpath -@Converter(generateLoader = true, ignoreOnLoadError = true) -public final class StringSequenceNumberConverter { - - private StringSequenceNumberConverter() { - } - - @Converter - public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) { - return new StaticSequenceNumberProvider(sequenceNumber); - } -} diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamlessClientMock.java similarity index 60% rename from components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java rename to components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamlessClientMock.java index bdf90d5..0cf72f6 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamlessClientMock.java @@ -16,16 +16,24 @@ */ package org.apache.camel.component.aws2.ddbstream; -public class StaticSequenceNumberProvider implements SequenceNumberProvider { +import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest; +import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; - private final String sequenceNumber; +class AmazonDDBStreamlessClientMock implements DynamoDbStreamsClient { - public StaticSequenceNumberProvider(String sequenceNumber) { - this.sequenceNumber = sequenceNumber; + @Override + public ListStreamsResponse listStreams(ListStreamsRequest listStreamsRequest) { + return ListStreamsResponse.builder().build(); } @Override - public String getSequenceNumber() { - return sequenceNumber; + public String serviceName() { + return DynamoDbStreamsClient.SERVICE_NAME; } + + @Override + public void close() { + } + } diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamsClientMock.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamsClientMock.java new file mode 100644 index 0000000..b59d31e --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamsClientMock.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.ddbstream; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest; +import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.Stream; +import software.amazon.awssdk.services.dynamodb.model.StreamDescription; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; + +import static org.apache.camel.component.aws2.ddbstream.ShardFixtures.STREAM_ARN; + +class AmazonDDBStreamsClientMock implements DynamoDbStreamsClient { + + private final Map<Shard, String> shardsToIterators = new HashMap<>(); + + @Override + public ListStreamsResponse listStreams(ListStreamsRequest listStreamsRequest) { + return ListStreamsResponse.builder().streams(Stream.builder().streamArn(STREAM_ARN).build()).build(); + } + + @Override + public DescribeStreamResponse describeStream(DescribeStreamRequest describeStreamRequest) { + return DescribeStreamResponse.builder() + .streamDescription(StreamDescription.builder().shards(shardsToIterators.keySet()).build()).build(); + } + + @Override + public GetShardIteratorResponse getShardIterator(GetShardIteratorRequest request) { + String shardIterator = shardsToIterators.entrySet().stream() + .filter(s -> s.getKey().shardId().equals(request.shardId())) + .map(Entry::getValue) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No mocked reponse was configured for " + request.shardId())); + return GetShardIteratorResponse.builder().shardIterator(shardIterator).build(); + } + + @Override + public String serviceName() { + return DynamoDbStreamsClient.SERVICE_NAME; + } + + @Override + public void close() { + } + + void setMockedShardAndIteratorResponse(Shard shard, String iterator) { + shardsToIterators.put(shard, iterator); + } +} diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardFixtures.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardFixtures.java new file mode 100644 index 0000000..5e7a3fb --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardFixtures.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.ddbstream; + +import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +final class ShardFixtures { + + public static final String STREAM_ARN = "arn:aws:dynamodb:eu-west-1:1234:table/some-table/stream/2021-05-07T09:03:40.295"; + + // + // shard 0 + // / \ + // shard 1 shard 2 + // / \ / \ + // shard 3 shard 4 shard 5 shard 6 + // + public static final Shard SHARD_0 = Shard.builder() + .shardId("SHARD_0") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("4100000000019118544662") + .endingSequenceNumber("4100000000019118559460").build()) + .build(); + public static final Shard SHARD_1 = Shard.builder() + .shardId("SHARD_1") + .parentShardId("SHARD_0") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("52700000000007125283545") + .endingSequenceNumber("52700000000007125283545").build()) + .build(); + public static final Shard SHARD_2 = Shard.builder() + .shardId("SHARD_2") + .parentShardId("SHARD_0") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("52700000000020262580503") + .endingSequenceNumber("52700000000020262580503").build()) + .build(); + public static final Shard SHARD_3 = Shard.builder() + .shardId("SHARD_3") + .parentShardId("SHARD_1") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("74100000000005516202603").build()) + .build(); + public static final Shard SHARD_4 = Shard.builder() + .shardId("SHARD_4") + .parentShardId("SHARD_1") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("74100000000044018483325").build()) + .build(); + public static final Shard SHARD_5 = Shard.builder() + .shardId("SHARD_5") + .parentShardId("SHARD_2") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("105800000000033207048658").build()) + .build(); + public static final Shard SHARD_6 = Shard.builder() + .shardId("SHARD_6") + .parentShardId("SHARD_2") + .sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("105800000000025199618049").build()) + .build(); + + private ShardFixtures() { + // Utility class, not called. + } +} diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandlerTest.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandlerTest.java new file mode 100644 index 0000000..7aebb2b --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandlerTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.ddbstream; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.component.aws2.ddbstream.ShardFixtures.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ShardIteratorHandlerTest extends CamelTestSupport { + + private static final String SHARD_ITERATOR_0 = STREAM_ARN + "|1|hash-0"; + private static final String SHARD_ITERATOR_1 = STREAM_ARN + "|1|hash-1"; + private static final String SHARD_ITERATOR_2 = STREAM_ARN + "|1|hash-2"; + private static final String SHARD_ITERATOR_3 = STREAM_ARN + "|1|hash-3"; + private static final String SHARD_ITERATOR_4 = STREAM_ARN + "|1|hash-4"; + private static final String SHARD_ITERATOR_5 = STREAM_ARN + "|1|hash-5"; + private static final String SHARD_ITERATOR_6 = STREAM_ARN + "|1|hash-6"; + + private Ddb2StreamComponent component; + private AmazonDDBStreamsClientMock dynamoDbStreamsClient; + + @BeforeEach + void setup() throws Exception { + component = context.getComponent("aws2-ddbstream", Ddb2StreamComponent.class); + dynamoDbStreamsClient = new AmazonDDBStreamsClientMock(); + component.getConfiguration().setAmazonDynamoDbStreamsClient(dynamoDbStreamsClient); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_0, SHARD_ITERATOR_0); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_1, SHARD_ITERATOR_1); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_2, SHARD_ITERATOR_2); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_3, SHARD_ITERATOR_3); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_4, SHARD_ITERATOR_4); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_5, SHARD_ITERATOR_5); + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_6, SHARD_ITERATOR_6); + } + + @Test + void shouldReturnLeafShardIterators() throws Exception { + component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST); + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + Map<String, String> expectedShardIterators = new HashMap<>(); + expectedShardIterators.put(SHARD_3.shardId(), SHARD_ITERATOR_3); + expectedShardIterators.put(SHARD_4.shardId(), SHARD_ITERATOR_4); + expectedShardIterators.put(SHARD_5.shardId(), SHARD_ITERATOR_5); + expectedShardIterators.put(SHARD_6.shardId(), SHARD_ITERATOR_6); + assertEquals(expectedShardIterators, underTest.getShardIterators()); + } + + @Test + void shouldReturnRootShardIterator() throws Exception { + component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_START); + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + assertEquals(Collections.singletonMap(SHARD_0.shardId(), SHARD_ITERATOR_0), underTest.getShardIterators()); + } + + @Test + void shouldProgressThroughTreeWhenShardIteratorsAreRetrievedRepeatedly() throws Exception { + component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_START); + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + assertEquals(Collections.singletonMap(SHARD_0.shardId(), SHARD_ITERATOR_0), underTest.getShardIterators()); + Map<String, String> expectedShardIterators1 = new HashMap<>(); + expectedShardIterators1.put(SHARD_1.shardId(), SHARD_ITERATOR_1); + expectedShardIterators1.put(SHARD_2.shardId(), SHARD_ITERATOR_2); + assertEquals(expectedShardIterators1, underTest.getShardIterators()); + Map<String, String> expectedShardIterators2 = new HashMap<>(); + expectedShardIterators2.put(SHARD_3.shardId(), SHARD_ITERATOR_3); + expectedShardIterators2.put(SHARD_4.shardId(), SHARD_ITERATOR_4); + expectedShardIterators2.put(SHARD_5.shardId(), SHARD_ITERATOR_5); + expectedShardIterators2.put(SHARD_6.shardId(), SHARD_ITERATOR_6); + assertEquals(expectedShardIterators2, underTest.getShardIterators()); + Map<String, String> expectedShardIterators3 = expectedShardIterators2; + assertEquals(expectedShardIterators3, underTest.getShardIterators()); + } + + @Test + void shouldUpdateShardIterator() throws Exception { + component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST); + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + underTest.getShardIterators(); + String updatedShardIterator5 = STREAM_ARN + "|1|hash-5-new"; + underTest.updateShardIterator(SHARD_5.shardId(), updatedShardIterator5); + + assertEquals(updatedShardIterator5, underTest.getShardIterators().get(SHARD_5.shardId())); + } + + @Test + void shouldRemoveShardIfNullIteratorIsProvided() throws Exception { + component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST); + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + underTest.getShardIterators(); + underTest.updateShardIterator(SHARD_3.shardId(), null); + + assertFalse(underTest.getShardIterators().containsKey(SHARD_3.shardId())); + } + + @Test + void shouldRequestAndCacheFreshShardIterator() throws Exception { + component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST); + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + underTest.getShardIterators(); + String freshShardIterator4 = STREAM_ARN + "|1|hash-4-fresh"; + dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_4, freshShardIterator4); + + assertEquals(freshShardIterator4, underTest.requestFreshShardIterator(SHARD_4.shardId(), freshShardIterator4)); + assertEquals(freshShardIterator4, underTest.getShardIterators().get(SHARD_4.shardId())); + } + + @Test + void shouldThrowIllegalArgumentExceptionIfNoStreamsAreReturned() throws Exception { + AmazonDDBStreamlessClientMock dynamoDbStreamsClient = new AmazonDDBStreamlessClientMock(); + component.getConfiguration().setAmazonDynamoDbStreamsClient(dynamoDbStreamsClient); + + Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) component.createEndpoint("aws2-ddbstreams://myTable"); + ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint); + endpoint.doStart(); + + assertThrows(IllegalArgumentException.class, () -> underTest.getShardIterators()); + } + +} diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardTreeTest.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardTreeTest.java new file mode 100644 index 0000000..f2a84a4 --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardTreeTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.ddbstream; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import static org.apache.camel.component.aws2.ddbstream.ShardFixtures.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ShardTreeTest { + + private final ShardTree underTest = new ShardTree(); + + @Test + void shouldGetLeafShards() { + underTest.populate(Arrays.asList(SHARD_0, SHARD_1, SHARD_2, SHARD_3, SHARD_4, SHARD_5, SHARD_6)); + + assertEquals(Arrays.asList(SHARD_5, SHARD_6, SHARD_3, SHARD_4), underTest.getLeaves()); + } + + @Test + void shouldReturnEmptyListIfAllShardsHaveAnEndingSequenceNumber() { + underTest.populate(Arrays.asList(SHARD_0, SHARD_1, SHARD_2)); + + assertEquals(Arrays.asList(), underTest.getLeaves()); + } + + @Test + void shouldGetRootShards() { + underTest.populate(Arrays.asList(SHARD_1, SHARD_2, SHARD_3, SHARD_4, SHARD_5, SHARD_6)); + + assertEquals(Arrays.asList(SHARD_1, SHARD_2), underTest.getRoots()); + } + + @Test + void shouldThrowIfNoUnparentedShardsCanBeFound() { + Shard selfParentingShard = Shard.builder().shardId("SHARD_X").parentShardId("SHARD_X").build(); + underTest.populate(Arrays.asList(selfParentingShard)); + + assertThrows(IllegalStateException.class, () -> underTest.getRoots()); + } + + @Test + void shouldGetChildShards() { + underTest.populate(Arrays.asList(SHARD_0, SHARD_1, SHARD_2, SHARD_3, SHARD_4, SHARD_5, SHARD_6)); + + assertEquals(Arrays.asList(SHARD_5, SHARD_6), underTest.getChildren("SHARD_2")); + } + + @Test + void shouldReturnEmptyListIfTheShardHasNoChildren() { + underTest.populate(Arrays.asList(SHARD_6)); + + assertEquals(Arrays.asList(), underTest.getChildren("SHARD_6")); + } + +} diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java index 9242576..3d7eb3a 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java @@ -106,26 +106,6 @@ public interface Aws2DdbstreamComponentBuilderFactory { return this; } /** - * Defines where in the DynaboDB stream to start getting records. Note - * that using TRIM_HORIZON can cause a significant delay before the - * stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are - * used, then a sequenceNumberProvider MUST be supplied. - * - * The option is a: - * <code>software.amazon.awssdk.services.dynamodb.model.ShardIteratorType</code> type. - * - * Default: LATEST - * Group: consumer - * - * @param iteratorType the value to set - * @return the dsl builder - */ - default Aws2DdbstreamComponentBuilder iteratorType( - software.amazon.awssdk.services.dynamodb.model.ShardIteratorType iteratorType) { - doSetProperty("iteratorType", iteratorType); - return this; - } - /** * Maximum number of records that will be fetched in each poll. * * The option is a: <code>int</code> type. @@ -219,21 +199,22 @@ public interface Aws2DdbstreamComponentBuilderFactory { return this; } /** - * Provider for the sequence number when using one of the two - * ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be a - * registry reference or a literal sequence number. + * Defines where in the DynamoDB stream to start getting records. Note + * that using FROM_START can cause a significant delay before the stream + * has caught up to real-time. * * The option is a: - * <code>org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider</code> type. + * <code>org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType</code> type. * + * Default: FROM_LATEST * Group: consumer * - * @param sequenceNumberProvider the value to set + * @param streamIteratorType the value to set * @return the dsl builder */ - default Aws2DdbstreamComponentBuilder sequenceNumberProvider( - org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider sequenceNumberProvider) { - doSetProperty("sequenceNumberProvider", sequenceNumberProvider); + default Aws2DdbstreamComponentBuilder streamIteratorType( + org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType streamIteratorType) { + doSetProperty("streamIteratorType", streamIteratorType); return this; } /** @@ -365,14 +346,13 @@ public interface Aws2DdbstreamComponentBuilderFactory { case "amazonDynamoDbStreamsClient": getOrCreateConfiguration((Ddb2StreamComponent) component).setAmazonDynamoDbStreamsClient((software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient) value); return true; case "bridgeErrorHandler": ((Ddb2StreamComponent) component).setBridgeErrorHandler((boolean) value); return true; case "configuration": ((Ddb2StreamComponent) component).setConfiguration((org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration) value); return true; - case "iteratorType": getOrCreateConfiguration((Ddb2StreamComponent) component).setIteratorType((software.amazon.awssdk.services.dynamodb.model.ShardIteratorType) value); return true; case "maxResultsPerRequest": getOrCreateConfiguration((Ddb2StreamComponent) component).setMaxResultsPerRequest((int) value); return true; case "overrideEndpoint": getOrCreateConfiguration((Ddb2StreamComponent) component).setOverrideEndpoint((boolean) value); return true; case "proxyHost": getOrCreateConfiguration((Ddb2StreamComponent) component).setProxyHost((java.lang.String) value); return true; case "proxyPort": getOrCreateConfiguration((Ddb2StreamComponent) component).setProxyPort((java.lang.Integer) value); return true; case "proxyProtocol": getOrCreateConfiguration((Ddb2StreamComponent) component).setProxyProtocol((software.amazon.awssdk.core.Protocol) value); return true; case "region": getOrCreateConfiguration((Ddb2StreamComponent) component).setRegion((java.lang.String) value); return true; - case "sequenceNumberProvider": getOrCreateConfiguration((Ddb2StreamComponent) component).setSequenceNumberProvider((org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider) value); return true; + case "streamIteratorType": getOrCreateConfiguration((Ddb2StreamComponent) component).setStreamIteratorType((org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType) value); return true; case "trustAllCertificates": getOrCreateConfiguration((Ddb2StreamComponent) component).setTrustAllCertificates((boolean) value); return true; case "uriEndpointOverride": getOrCreateConfiguration((Ddb2StreamComponent) component).setUriEndpointOverride((java.lang.String) value); return true; case "useDefaultCredentialsProvider": getOrCreateConfiguration((Ddb2StreamComponent) component).setUseDefaultCredentialsProvider((boolean) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java index 7b2a3d5..471ed16 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java @@ -124,45 +124,6 @@ public interface Ddb2StreamEndpointBuilderFactory { return this; } /** - * Defines where in the DynaboDB stream to start getting records. Note - * that using TRIM_HORIZON can cause a significant delay before the - * stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are - * used, then a sequenceNumberProvider MUST be supplied. - * - * The option is a: - * <code>software.amazon.awssdk.services.dynamodb.model.ShardIteratorType</code> type. - * - * Default: LATEST - * Group: consumer - * - * @param iteratorType the value to set - * @return the dsl builder - */ - default Ddb2StreamEndpointBuilder iteratorType( - ShardIteratorType iteratorType) { - doSetProperty("iteratorType", iteratorType); - return this; - } - /** - * Defines where in the DynaboDB stream to start getting records. Note - * that using TRIM_HORIZON can cause a significant delay before the - * stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are - * used, then a sequenceNumberProvider MUST be supplied. - * - * The option will be converted to a - * <code>software.amazon.awssdk.services.dynamodb.model.ShardIteratorType</code> type. - * - * Default: LATEST - * Group: consumer - * - * @param iteratorType the value to set - * @return the dsl builder - */ - default Ddb2StreamEndpointBuilder iteratorType(String iteratorType) { - doSetProperty("iteratorType", iteratorType); - return this; - } - /** * Maximum number of records that will be fetched in each poll. * * The option is a: <code>int</code> type. @@ -352,39 +313,41 @@ public interface Ddb2StreamEndpointBuilderFactory { return this; } /** - * Provider for the sequence number when using one of the two - * ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be a - * registry reference or a literal sequence number. + * Defines where in the DynamoDB stream to start getting records. Note + * that using FROM_START can cause a significant delay before the stream + * has caught up to real-time. * * The option is a: - * <code>org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider</code> type. + * <code>org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration$StreamIteratorType</code> type. * + * Default: FROM_LATEST * Group: consumer * - * @param sequenceNumberProvider the value to set + * @param streamIteratorType the value to set * @return the dsl builder */ - default Ddb2StreamEndpointBuilder sequenceNumberProvider( - Object sequenceNumberProvider) { - doSetProperty("sequenceNumberProvider", sequenceNumberProvider); + default Ddb2StreamEndpointBuilder streamIteratorType( + StreamIteratorType streamIteratorType) { + doSetProperty("streamIteratorType", streamIteratorType); return this; } /** - * Provider for the sequence number when using one of the two - * ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be a - * registry reference or a literal sequence number. + * Defines where in the DynamoDB stream to start getting records. Note + * that using FROM_START can cause a significant delay before the stream + * has caught up to real-time. * * The option will be converted to a - * <code>org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider</code> type. + * <code>org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration$StreamIteratorType</code> type. * + * Default: FROM_LATEST * Group: consumer * - * @param sequenceNumberProvider the value to set + * @param streamIteratorType the value to set * @return the dsl builder */ - default Ddb2StreamEndpointBuilder sequenceNumberProvider( - String sequenceNumberProvider) { - doSetProperty("sequenceNumberProvider", sequenceNumberProvider); + default Ddb2StreamEndpointBuilder streamIteratorType( + String streamIteratorType) { + doSetProperty("streamIteratorType", streamIteratorType); return this; } /** @@ -1097,18 +1060,6 @@ public interface Ddb2StreamEndpointBuilderFactory { } /** - * Proxy enum for - * <code>software.amazon.awssdk.services.dynamodb.model.ShardIteratorType</code> enum. - */ - enum ShardIteratorType { - TRIM_HORIZON, - LATEST, - AT_SEQUENCE_NUMBER, - AFTER_SEQUENCE_NUMBER, - UNKNOWN_TO_SDK_VERSION; - } - - /** * Proxy enum for <code>software.amazon.awssdk.core.Protocol</code> enum. */ enum Protocol { @@ -1116,6 +1067,15 @@ public interface Ddb2StreamEndpointBuilderFactory { HTTPS; } + /** + * Proxy enum for + * <code>org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration$StreamIteratorType</code> enum. + */ + enum StreamIteratorType { + FROM_LATEST, + FROM_START; + } + public interface Ddb2StreamBuilders { /** * AWS DynamoDB Streams (camel-aws2-ddb)