This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 49039bb5afc12a083311b5bac56018b2b5d7be84
Author: Pierre-Yves B <pierre-yves.bigour...@bbc.co.uk>
AuthorDate: Wed Sep 22 14:32:52 2021 +0100

    CAMEL-16594 Support more than one active shard in DynamoDB stream.
---
 .../ddbstream/Ddb2StreamComponentConfigurer.java   |  18 +--
 .../ddbstream/Ddb2StreamEndpointConfigurer.java    |  18 +--
 .../ddbstream/Ddb2StreamEndpointUriFactory.java    |   5 +-
 .../StringSequenceNumberConverterLoader.java       |  56 -------
 .../services/org/apache/camel/TypeConverterLoader  |   2 -
 .../component/aws2/ddbstream/aws2-ddbstream.json   |   6 +-
 .../aws2/ddbstream/Ddb2StreamConfiguration.java    |  38 ++---
 .../aws2/ddbstream/Ddb2StreamConsumer.java         |  95 +++++-------
 .../aws2/ddbstream/Ddb2StreamEndpoint.java         |  21 +--
 .../aws2/ddbstream/SequenceNumberProvider.java     |  21 ---
 .../aws2/ddbstream/ShardIteratorHandler.java       | 166 ++++++++++-----------
 .../camel/component/aws2/ddbstream/ShardTree.java  |  68 +++++++++
 .../ddbstream/StringSequenceNumberConverter.java   |  32 ----
 .../ddbstream/AmazonDDBStreamlessClientMock.java}  |  20 ++-
 .../aws2/ddbstream/AmazonDDBStreamsClientMock.java |  73 +++++++++
 .../component/aws2/ddbstream/ShardFixtures.java    |  74 +++++++++
 .../aws2/ddbstream/ShardIteratorHandlerTest.java   | 161 ++++++++++++++++++++
 .../component/aws2/ddbstream/ShardTreeTest.java    |  75 ++++++++++
 .../dsl/Aws2DdbstreamComponentBuilderFactory.java  |  40 ++---
 .../dsl/Ddb2StreamEndpointBuilderFactory.java      |  94 ++++--------
 20 files changed, 654 insertions(+), 429 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java
 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java
index 28e2949..35ff908 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamComponentConfigurer.java
@@ -37,8 +37,6 @@ public class Ddb2StreamComponentConfigurer extends 
PropertyConfigurerSupport imp
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": 
target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); 
return true;
         case "configuration": target.setConfiguration(property(camelContext, 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.class, 
value)); return true;
-        case "iteratortype":
-        case "iteratorType": 
getOrCreateConfiguration(target).setIteratorType(property(camelContext, 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class, 
value)); return true;
         case "maxresultsperrequest":
         case "maxResultsPerRequest": 
getOrCreateConfiguration(target).setMaxResultsPerRequest(property(camelContext, 
int.class, value)); return true;
         case "overrideendpoint":
@@ -52,8 +50,8 @@ public class Ddb2StreamComponentConfigurer extends 
PropertyConfigurerSupport imp
         case "region": 
getOrCreateConfiguration(target).setRegion(property(camelContext, 
java.lang.String.class, value)); return true;
         case "secretkey":
         case "secretKey": 
getOrCreateConfiguration(target).setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
-        case "sequencenumberprovider":
-        case "sequenceNumberProvider": 
getOrCreateConfiguration(target).setSequenceNumberProvider(property(camelContext,
 org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, 
value)); return true;
+        case "streamiteratortype":
+        case "streamIteratorType": 
getOrCreateConfiguration(target).setStreamIteratorType(property(camelContext, 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class,
 value)); return true;
         case "trustallcertificates":
         case "trustAllCertificates": 
getOrCreateConfiguration(target).setTrustAllCertificates(property(camelContext, 
boolean.class, value)); return true;
         case "uriendpointoverride":
@@ -81,8 +79,6 @@ public class Ddb2StreamComponentConfigurer extends 
PropertyConfigurerSupport imp
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return boolean.class;
         case "configuration": return 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.class;
-        case "iteratortype":
-        case "iteratorType": return 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class;
         case "maxresultsperrequest":
         case "maxResultsPerRequest": return int.class;
         case "overrideendpoint":
@@ -96,8 +92,8 @@ public class Ddb2StreamComponentConfigurer extends 
PropertyConfigurerSupport imp
         case "region": return java.lang.String.class;
         case "secretkey":
         case "secretKey": return java.lang.String.class;
-        case "sequencenumberprovider":
-        case "sequenceNumberProvider": return 
org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class;
+        case "streamiteratortype":
+        case "streamIteratorType": return 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class;
         case "trustallcertificates":
         case "trustAllCertificates": return boolean.class;
         case "uriendpointoverride":
@@ -121,8 +117,6 @@ public class Ddb2StreamComponentConfigurer extends 
PropertyConfigurerSupport imp
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "configuration": return target.getConfiguration();
-        case "iteratortype":
-        case "iteratorType": return 
getOrCreateConfiguration(target).getIteratorType();
         case "maxresultsperrequest":
         case "maxResultsPerRequest": return 
getOrCreateConfiguration(target).getMaxResultsPerRequest();
         case "overrideendpoint":
@@ -136,8 +130,8 @@ public class Ddb2StreamComponentConfigurer extends 
PropertyConfigurerSupport imp
         case "region": return getOrCreateConfiguration(target).getRegion();
         case "secretkey":
         case "secretKey": return 
getOrCreateConfiguration(target).getSecretKey();
-        case "sequencenumberprovider":
-        case "sequenceNumberProvider": return 
getOrCreateConfiguration(target).getSequenceNumberProvider();
+        case "streamiteratortype":
+        case "streamIteratorType": return 
getOrCreateConfiguration(target).getStreamIteratorType();
         case "trustallcertificates":
         case "trustAllCertificates": return 
getOrCreateConfiguration(target).isTrustAllCertificates();
         case "uriendpointoverride":
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java
 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java
index 4c4af9b..4fde0a8 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointConfigurer.java
@@ -41,8 +41,6 @@ public class Ddb2StreamEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "greedy": target.setGreedy(property(camelContext, boolean.class, 
value)); return true;
         case "initialdelay":
         case "initialDelay": target.setInitialDelay(property(camelContext, 
long.class, value)); return true;
-        case "iteratortype":
-        case "iteratorType": 
target.getConfiguration().setIteratorType(property(camelContext, 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class, 
value)); return true;
         case "maxresultsperrequest":
         case "maxResultsPerRequest": 
target.getConfiguration().setMaxResultsPerRequest(property(camelContext, 
int.class, value)); return true;
         case "overrideendpoint":
@@ -69,10 +67,10 @@ public class Ddb2StreamEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "secretKey": 
target.getConfiguration().setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
         case "sendemptymessagewhenidle":
         case "sendEmptyMessageWhenIdle": 
target.setSendEmptyMessageWhenIdle(property(camelContext, boolean.class, 
value)); return true;
-        case "sequencenumberprovider":
-        case "sequenceNumberProvider": 
target.getConfiguration().setSequenceNumberProvider(property(camelContext, 
org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, 
value)); return true;
         case "startscheduler":
         case "startScheduler": target.setStartScheduler(property(camelContext, 
boolean.class, value)); return true;
+        case "streamiteratortype":
+        case "streamIteratorType": 
target.getConfiguration().setStreamIteratorType(property(camelContext, 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class,
 value)); return true;
         case "timeunit":
         case "timeUnit": target.setTimeUnit(property(camelContext, 
java.util.concurrent.TimeUnit.class, value)); return true;
         case "trustallcertificates":
@@ -115,8 +113,6 @@ public class Ddb2StreamEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "greedy": return boolean.class;
         case "initialdelay":
         case "initialDelay": return long.class;
-        case "iteratortype":
-        case "iteratorType": return 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.class;
         case "maxresultsperrequest":
         case "maxResultsPerRequest": return int.class;
         case "overrideendpoint":
@@ -143,10 +139,10 @@ public class Ddb2StreamEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "secretKey": return java.lang.String.class;
         case "sendemptymessagewhenidle":
         case "sendEmptyMessageWhenIdle": return boolean.class;
-        case "sequencenumberprovider":
-        case "sequenceNumberProvider": return 
org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class;
         case "startscheduler":
         case "startScheduler": return boolean.class;
+        case "streamiteratortype":
+        case "streamIteratorType": return 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType.class;
         case "timeunit":
         case "timeUnit": return java.util.concurrent.TimeUnit.class;
         case "trustallcertificates":
@@ -185,8 +181,6 @@ public class Ddb2StreamEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "greedy": return target.isGreedy();
         case "initialdelay":
         case "initialDelay": return target.getInitialDelay();
-        case "iteratortype":
-        case "iteratorType": return 
target.getConfiguration().getIteratorType();
         case "maxresultsperrequest":
         case "maxResultsPerRequest": return 
target.getConfiguration().getMaxResultsPerRequest();
         case "overrideendpoint":
@@ -213,10 +207,10 @@ public class Ddb2StreamEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "secretKey": return target.getConfiguration().getSecretKey();
         case "sendemptymessagewhenidle":
         case "sendEmptyMessageWhenIdle": return 
target.isSendEmptyMessageWhenIdle();
-        case "sequencenumberprovider":
-        case "sequenceNumberProvider": return 
target.getConfiguration().getSequenceNumberProvider();
         case "startscheduler":
         case "startScheduler": return target.isStartScheduler();
+        case "streamiteratortype":
+        case "streamIteratorType": return 
target.getConfiguration().getStreamIteratorType();
         case "timeunit":
         case "timeUnit": return target.getTimeUnit();
         case "trustallcertificates":
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java
 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java
index 9a55969..3e374ab 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class Ddb2StreamEndpointUriFactory extends 
org.apache.camel.support.compo
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(34);
+        Set<String> props = new HashSet<>(33);
         props.add("backoffMultiplier");
         props.add("initialDelay");
         props.add("amazonDynamoDbStreamsClient");
@@ -37,14 +37,12 @@ public class Ddb2StreamEndpointUriFactory extends 
org.apache.camel.support.compo
         props.add("timeUnit");
         props.add("proxyProtocol");
         props.add("secretKey");
-        props.add("iteratorType");
         props.add("sendEmptyMessageWhenIdle");
         props.add("schedulerProperties");
         props.add("uriEndpointOverride");
         props.add("exchangePattern");
         props.add("useDefaultCredentialsProvider");
         props.add("proxyHost");
-        props.add("sequenceNumberProvider");
         props.add("backoffIdleThreshold");
         props.add("trustAllCertificates");
         props.add("delay");
@@ -53,6 +51,7 @@ public class Ddb2StreamEndpointUriFactory extends 
org.apache.camel.support.compo
         props.add("accessKey");
         props.add("overrideEndpoint");
         props.add("maxResultsPerRequest");
+        props.add("streamIteratorType");
         props.add("region");
         props.add("exceptionHandler");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java
 
b/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java
deleted file mode 100644
index 3ab0a19..0000000
--- 
a/components/camel-aws/camel-aws2-ddb/src/generated/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverterLoader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/* Generated by camel build tools - do NOT edit this file! */
-package org.apache.camel.component.aws2.ddbstream;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.DeferredContextBinding;
-import org.apache.camel.Exchange;
-import org.apache.camel.TypeConversionException;
-import org.apache.camel.TypeConverterLoaderException;
-import org.apache.camel.spi.TypeConverterLoader;
-import org.apache.camel.spi.TypeConverterRegistry;
-import org.apache.camel.support.SimpleTypeConverter;
-import org.apache.camel.support.TypeConverterSupport;
-import org.apache.camel.util.DoubleMap;
-
-/**
- * Generated by camel build tools - do NOT edit this file!
- */
-@SuppressWarnings("unchecked")
-@DeferredContextBinding
-public final class StringSequenceNumberConverterLoader implements 
TypeConverterLoader, CamelContextAware {
-
-    private CamelContext camelContext;
-
-    public StringSequenceNumberConverterLoader() {
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void load(TypeConverterRegistry registry) throws 
TypeConverterLoaderException {
-        try {
-            registerConverters(registry);
-        } catch (Throwable e) {
-            // ignore on load error
-        }
-    }
-
-    private void registerConverters(TypeConverterRegistry registry) {
-        addTypeConverter(registry, 
org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider.class, 
java.lang.String.class, false,
-            (type, exchange, value) -> 
org.apache.camel.component.aws2.ddbstream.StringSequenceNumberConverter.toSequenceNumberProvider((java.lang.String)
 value));
-    }
-
-    private static void addTypeConverter(TypeConverterRegistry registry, 
Class<?> toType, Class<?> fromType, boolean allowNull, 
SimpleTypeConverter.ConversionMethod method) { 
-        registry.addTypeConverter(toType, fromType, new 
SimpleTypeConverter(allowNull, method));
-    }
-
-}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
 
b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
deleted file mode 100644
index d5cbd3a..0000000
--- 
a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
+++ /dev/null
@@ -1,2 +0,0 @@
-# Generated by camel build tools - do NOT edit this file!
-org.apache.camel.component.aws2.ddbstream.StringSequenceNumberConverterLoader
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json
 
b/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json
index ce2fbf3..533bd93 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json
+++ 
b/components/camel-aws/camel-aws2-ddb/src/generated/resources/org/apache/camel/component/aws2/ddbstream/aws2-ddbstream.json
@@ -25,14 +25,13 @@
     "amazonDynamoDbStreamsClient": { "kind": "property", "displayName": 
"Amazon Dynamo Db Streams Client", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient", 
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "descripti [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "configuration": { "kind": "property", "displayName": "Configuration", 
"group": "consumer", "label": "", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"deprecated": false, "autowired": false, "secret": false, "description": "The 
component configuration" },
-    "iteratorType": { "kind": "property", "displayName": "Iterator Type", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "software.amazon.awssdk.services.dynamodb.model.ShardIteratorType", 
"enum": [ "TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", 
"AFTER_SEQUENCE_NUMBER", "null" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfigu [...]
     "maxResultsPerRequest": { "kind": "property", "displayName": "Max Results 
Per Request", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
     "overrideEndpoint": { "kind": "property", "displayName": "Override 
Endpoint", "group": "consumer", "label": "", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Set the need for 
overidding the endpoint. This option needs to be used in combination wi [...]
     "proxyHost": { "kind": "property", "displayName": "Proxy Host", "group": 
"consumer", "label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "To define a proxy host 
when instantiating the DDBStreams client" },
     "proxyPort": { "kind": "property", "displayName": "Proxy Port", "group": 
"consumer", "label": "", "required": false, "type": "integer", "javaType": 
"java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "To define a proxy port 
when instantiating the DDBStreams client" },
     "proxyProtocol": { "kind": "property", "displayName": "Proxy Protocol", 
"group": "consumer", "label": "", "required": false, "type": "object", 
"javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" 
], "deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"HTTPS", "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "To define a proxy 
protocol when ins [...]
     "region": { "kind": "property", "displayName": "Region", "group": 
"consumer", "label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "The region in which 
DDBStreams client needs to work" },
-    "sequenceNumberProvider": { "kind": "property", "displayName": "Sequence 
Number Provider", "group": "consumer", "label": "consumer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Provider for the 
sequence number w [...]
+    "streamIteratorType": { "kind": "property", "displayName": "Stream 
Iterator Type", "group": "consumer", "label": "consumer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType",
 "enum": [ "FROM_LATEST", "FROM_START" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "FROM_LATEST", "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "conf [...]
     "trustAllCertificates": { "kind": "property", "displayName": "Trust All 
Certificates", "group": "consumer", "label": "", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "If we want to trust all 
certificates in case of overriding the endpoint" },
     "uriEndpointOverride": { "kind": "property", "displayName": "Uri Endpoint 
Override", "group": "consumer", "label": "", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Set the overriding uri 
endpoint. This option needs to be used in combination with overrideEndpoi [...]
     "useDefaultCredentialsProvider": { "kind": "property", "displayName": "Use 
Default Credentials Provider", "group": "consumer", "label": "", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Set whether the DynamoDB 
Streams client should expect to loa [...]
@@ -44,7 +43,6 @@
     "tableName": { "kind": "path", "displayName": "Table Name", "group": 
"consumer", "label": "consumer", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Name of the dynamodb 
table" },
     "amazonDynamoDbStreamsClient": { "kind": "parameter", "displayName": 
"Amazon Dynamo Db Streams Client", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient", 
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "descript [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a m [...]
-    "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "software.amazon.awssdk.services.dynamodb.model.ShardIteratorType", 
"enum": [ "TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", 
"AFTER_SEQUENCE_NUMBER", "null" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfig [...]
     "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results 
Per Request", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
     "overrideEndpoint": { "kind": "parameter", "displayName": "Override 
Endpoint", "group": "consumer", "label": "", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Set the need for 
overidding the endpoint. This option needs to be used in combination w [...]
     "proxyHost": { "kind": "parameter", "displayName": "Proxy Host", "group": 
"consumer", "label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "To define a proxy host 
when instantiating the DDBStreams client" },
@@ -52,7 +50,7 @@
     "proxyProtocol": { "kind": "parameter", "displayName": "Proxy Protocol", 
"group": "consumer", "label": "", "required": false, "type": "object", 
"javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" 
], "deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"HTTPS", "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "To define a proxy 
protocol when in [...]
     "region": { "kind": "parameter", "displayName": "Region", "group": 
"consumer", "label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "The region in which 
DDBStreams client needs to work" },
     "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send 
Empty Message When Idle", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": "If 
the polling consumer did not poll any files, you can enable this option to send 
an empty message (no body) instead." },
-    "sequenceNumberProvider": { "kind": "parameter", "displayName": "Sequence 
Number Provider", "group": "consumer", "label": "consumer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Provider for the 
sequence number  [...]
+    "streamIteratorType": { "kind": "parameter", "displayName": "Stream 
Iterator Type", "group": "consumer", "label": "consumer", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType",
 "enum": [ "FROM_LATEST", "FROM_START" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "FROM_LATEST", "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", "con [...]
     "trustAllCertificates": { "kind": "parameter", "displayName": "Trust All 
Certificates", "group": "consumer", "label": "", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "If we want to trust all 
certificates in case of overriding the endpoint" },
     "uriEndpointOverride": { "kind": "parameter", "displayName": "Uri Endpoint 
Override", "group": "consumer", "label": "", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Set the overriding uri 
endpoint. This option needs to be used in combination with overrideEndpo [...]
     "useDefaultCredentialsProvider": { "kind": "parameter", "displayName": 
"Use Default Credentials Provider", "group": "consumer", "label": "", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": 
"org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration", 
"configurationField": "configuration", "description": "Set whether the DynamoDB 
Streams client should expect to lo [...]
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java
index 06e7933..9a78be3 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConfiguration.java
@@ -22,7 +22,6 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 import software.amazon.awssdk.core.Protocol;
-import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
 import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 
 @UriParams
@@ -47,18 +46,12 @@ public class Ddb2StreamConfiguration implements Cloneable {
     private int maxResultsPerRequest = 100;
 
     @UriParam(label = "consumer",
-              description = "Defines where in the DynaboDB stream"
-                            + " to start getting records. Note that using 
TRIM_HORIZON can cause a"
-                            + " significant delay before the stream has caught 
up to real-time."
-                            + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a 
sequenceNumberProvider" + " MUST be supplied.",
-              defaultValue = "LATEST")
-    private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
+              description = "Defines where in the DynamoDB stream"
+                            + " to start getting records. Note that using 
FROM_START can cause a"
+                            + " significant delay before the stream has caught 
up to real-time.",
+              defaultValue = "FROM_LATEST")
+    private StreamIteratorType streamIteratorType = 
StreamIteratorType.FROM_LATEST;
 
-    @UriParam(label = "consumer",
-              description = "Provider for the sequence number when"
-                            + " using one of the two 
ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER"
-                            + " iterator types. Can be a registry reference or 
a literal sequence number.")
-    private SequenceNumberProvider sequenceNumberProvider;
     @UriParam(enums = "HTTP,HTTPS", defaultValue = "HTTPS",
               description = "To define a proxy protocol when instantiating the 
DDBStreams client")
     private Protocol proxyProtocol = Protocol.HTTPS;
@@ -127,20 +120,12 @@ public class Ddb2StreamConfiguration implements Cloneable 
{
         this.tableName = tableName;
     }
 
-    public ShardIteratorType getIteratorType() {
-        return iteratorType;
-    }
-
-    public void setIteratorType(ShardIteratorType iteratorType) {
-        this.iteratorType = iteratorType;
+    public StreamIteratorType getStreamIteratorType() {
+        return streamIteratorType;
     }
 
-    public SequenceNumberProvider getSequenceNumberProvider() {
-        return sequenceNumberProvider;
-    }
-
-    public void setSequenceNumberProvider(SequenceNumberProvider 
sequenceNumberProvider) {
-        this.sequenceNumberProvider = sequenceNumberProvider;
+    public void setStreamIteratorType(StreamIteratorType streamIteratorType) {
+        this.streamIteratorType = streamIteratorType;
     }
 
     public Protocol getProxyProtocol() {
@@ -210,4 +195,9 @@ public class Ddb2StreamConfiguration implements Cloneable {
             throw new RuntimeCamelException(e);
         }
     }
+
+    public enum StreamIteratorType {
+        FROM_LATEST,
+        FROM_START
+    }
 }
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index b952176..25044fc 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.component.aws2.ddbstream;
 
-import java.math.BigInteger;
 import java.util.ArrayDeque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Queue;
 
 import org.apache.camel.AsyncCallback;
@@ -33,14 +35,13 @@ import 
software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
 import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
 import software.amazon.awssdk.services.dynamodb.model.Record;
-import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 
 public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Ddb2StreamConsumer.class);
 
     private final ShardIteratorHandler shardIteratorHandler;
-    private String lastSeenSequenceNumber;
+    private final Map<String, String> lastSeenSequenceNumbers = new 
HashMap<>();
 
     public Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor 
processor) {
         this(endpoint, processor, new ShardIteratorHandler(endpoint));
@@ -53,29 +54,41 @@ public class Ddb2StreamConsumer extends 
ScheduledBatchPollingConsumer {
 
     @Override
     protected int poll() throws Exception {
-        GetRecordsResponse result;
-        try {
-            GetRecordsRequest.Builder req
-                    = 
GetRecordsRequest.builder().shardIterator(shardIteratorHandler.getShardIterator(null))
-                            
.limit(getEndpoint().getConfiguration().getMaxResultsPerRequest());
-            result = getClient().getRecords(req.build());
-        } catch (ExpiredIteratorException e) {
-            LOG.warn("Expired Shard Iterator, attempting to resume from {}", 
lastSeenSequenceNumber, e);
-            GetRecordsRequest.Builder req
-                    = 
GetRecordsRequest.builder().shardIterator(shardIteratorHandler.getShardIterator(lastSeenSequenceNumber))
-                            
.limit(getEndpoint().getConfiguration().getMaxResultsPerRequest());
-            result = getClient().getRecords(req.build());
-        }
-        List<Record> records = result.records();
-
-        Queue<Exchange> exchanges = createExchanges(records, 
lastSeenSequenceNumber);
-        int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
+        int processedExchangeCount = 0;
+        Map<String, String> shardIterators = 
shardIteratorHandler.getShardIterators();
+        for (Entry<String, String> shardIteratorEntry : 
shardIterators.entrySet()) {
+            int limitPerRecordsRequest = Math.max(1,
+                    getEndpoint().getConfiguration().getMaxResultsPerRequest() 
/ shardIterators.size());
+            String shardId = shardIteratorEntry.getKey();
+            String shardIterator = shardIteratorEntry.getValue();
+            GetRecordsResponse result;
+            try {
+                GetRecordsRequest req = GetRecordsRequest.builder()
+                        .shardIterator(shardIterator)
+                        .limit(limitPerRecordsRequest)
+                        .build();
+                result = getEndpoint().getClient().getRecords(req);
+            } catch (ExpiredIteratorException e) {
+                String lastSeenSequenceNumber = 
lastSeenSequenceNumbers.get(shardId);
+                LOG.warn("Expired Shard Iterator, attempting to resume from 
{}", lastSeenSequenceNumber, e);
+                GetRecordsRequest req = GetRecordsRequest.builder()
+                        
.shardIterator(shardIteratorHandler.requestFreshShardIterator(shardId, 
lastSeenSequenceNumber))
+                        .limit(limitPerRecordsRequest)
+                        .build();
+                result = getEndpoint().getClient().getRecords(req);
+            }
+            List<Record> records = result.records();
+            Queue<Exchange> exchanges = new ArrayDeque<>();
+            for (Record record : records) {
+                exchanges.add(createExchange(record));
+            }
+            processedExchangeCount += processBatch(CastUtils.cast(exchanges));
 
-        shardIteratorHandler.updateShardIterator(result.nextShardIterator());
-        if (!records.isEmpty()) {
-            lastSeenSequenceNumber = records.get(records.size() - 
1).dynamodb().sequenceNumber();
+            shardIteratorHandler.updateShardIterator(shardId, 
result.nextShardIterator());
+            if (!records.isEmpty()) {
+                lastSeenSequenceNumbers.put(shardId, 
records.get(records.size() - 1).dynamodb().sequenceNumber());
+            }
         }
-
         return processedExchangeCount;
     }
 
@@ -99,42 +112,8 @@ public class Ddb2StreamConsumer extends 
ScheduledBatchPollingConsumer {
         return ex;
     }
 
-    private DynamoDbStreamsClient getClient() {
-        return getEndpoint().getClient();
-    }
-
     @Override
     public Ddb2StreamEndpoint getEndpoint() {
         return (Ddb2StreamEndpoint) super.getEndpoint();
     }
-
-    private Queue<Exchange> createExchanges(List<Record> records, String 
lastSeenSequenceNumber) {
-        Queue<Exchange> exchanges = new ArrayDeque<>();
-        BigIntComparisons condition = null;
-        BigInteger providedSeqNum = null;
-        if (lastSeenSequenceNumber != null) {
-            providedSeqNum = new BigInteger(lastSeenSequenceNumber);
-            condition = BigIntComparisons.Conditions.LT;
-        }
-        switch (getEndpoint().getConfiguration().getIteratorType()) {
-            case AFTER_SEQUENCE_NUMBER:
-                condition = BigIntComparisons.Conditions.LT;
-                providedSeqNum
-                        = new 
BigInteger(getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber());
-                break;
-            case AT_SEQUENCE_NUMBER:
-                condition = BigIntComparisons.Conditions.LTEQ;
-                providedSeqNum
-                        = new 
BigInteger(getEndpoint().getConfiguration().getSequenceNumberProvider().getSequenceNumber());
-                break;
-            default:
-        }
-        for (Record record : records) {
-            BigInteger recordSeqNum = new 
BigInteger(record.dynamodb().sequenceNumber());
-            if (condition == null || condition.matches(providedSeqNum, 
recordSeqNum)) {
-                exchanges.add(createExchange(record));
-            }
-        }
-        return exchanges;
-    }
 }
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
index 5dad187..1259207 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -95,22 +95,6 @@ public class Ddb2StreamEndpoint extends 
ScheduledPollEndpoint {
         return ddbStreamClient;
     }
 
-    public String getSequenceNumber() {
-        switch (configuration.getIteratorType()) {
-            case AFTER_SEQUENCE_NUMBER:
-            case AT_SEQUENCE_NUMBER:
-                if (null == configuration.getSequenceNumberProvider()) {
-                    throw new IllegalStateException(
-                            "sequenceNumberProvider must be" + " provided, 
either as an implementation of"
-                                                    + " SequenceNumberProvider 
or a literal String.");
-                } else {
-                    return 
configuration.getSequenceNumberProvider().getSequenceNumber();
-                }
-            default:
-                return "";
-        }
-    }
-
     DynamoDbStreamsClient createDdbStreamClient() {
         DynamoDbStreamsClient client = null;
         DynamoDbStreamsClientBuilder clientBuilder = 
DynamoDbStreamsClient.builder();
@@ -161,8 +145,7 @@ public class Ddb2StreamEndpoint extends 
ScheduledPollEndpoint {
     public String toString() {
         return "DdbStreamEndpoint{" + "tableName=" + 
configuration.getTableName()
                + ", amazonDynamoDbStreamsClient=[redacted], 
maxResultsPerRequest="
-               + configuration.getMaxResultsPerRequest() + ", iteratorType=" + 
configuration.getIteratorType()
-               + ", sequenceNumberProvider="
-               + configuration.getSequenceNumberProvider() + ", uri=" + 
getEndpointUri() + '}';
+               + configuration.getMaxResultsPerRequest() + ", 
streamIteratorType=" + configuration.getStreamIteratorType()
+               + ", uri=" + getEndpointUri() + '}';
     }
 }
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java
deleted file mode 100644
index b25fd86..0000000
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/SequenceNumberProvider.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.ddbstream;
-
-public interface SequenceNumberProvider {
-    String getSequenceNumber();
-}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java
index b3e5940..685ea53 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java
@@ -16,14 +16,17 @@
  */
 package org.apache.camel.component.aws2.ddbstream;
 
-import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
+import 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
 import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest;
 import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse;
 import software.amazon.awssdk.services.dynamodb.model.Shard;
@@ -34,111 +37,108 @@ class ShardIteratorHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(ShardIteratorHandler.class);
 
     private final Ddb2StreamEndpoint endpoint;
-    private final ShardList shardList = new ShardList();
+    private final ShardTree shardTree = new ShardTree();
 
-    private String currentShardIterator;
-    private Shard currentShard;
+    private String streamArn;
+    private Map<String, String> currentShardIterators = new HashMap<>();
 
     ShardIteratorHandler(Ddb2StreamEndpoint endpoint) {
         this.endpoint = endpoint;
     }
 
-    String getShardIterator(String resumeFromSequenceNumber) {
-        ShardIteratorType iteratorType = 
getEndpoint().getConfiguration().getIteratorType();
-        String sequenceNumber = getEndpoint().getSequenceNumber();
-        if (resumeFromSequenceNumber != null) {
-            // Reset things as we're in an error condition.
-            currentShard = null;
-            currentShardIterator = null;
-            iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
-            sequenceNumber = resumeFromSequenceNumber;
+    Map<String, String> getShardIterators() {
+        if (streamArn == null) {
+            streamArn = getStreamArn();
         }
-        // either return a cached one or get a new one via a GetShardIterator
-        // request.
-        if (currentShardIterator == null) {
-            ListStreamsResponse streamsListResult = getClient().listStreams(
-                    
ListStreamsRequest.builder().tableName(getEndpoint().getConfiguration().getTableName()).build());
-            if (streamsListResult.streams().isEmpty()) {
-                throw new IllegalArgumentException(
-                        "There is no stream associated with table configured. 
Please create one.");
-            }
-            final String streamArn = 
streamsListResult.streams().get(0).streamArn(); // XXX
-            // assumes
-            // there
-            // is
-            // only
-            // one
-            // stream
+        // Either return cached ones or get new ones via GetShardIterator 
requests.
+        if (currentShardIterators.isEmpty()) {
             DescribeStreamResponse streamDescriptionResult
                     = 
getClient().describeStream(DescribeStreamRequest.builder().streamArn(streamArn).build());
-            
shardList.addAll(streamDescriptionResult.streamDescription().shards());
+            
shardTree.populate(streamDescriptionResult.streamDescription().shards());
 
-            LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
-            if (currentShard == null) {
-                currentShard = resolveNewShard(iteratorType, 
resumeFromSequenceNumber);
-            } else {
-                currentShard = shardList.nextAfter(currentShard);
+            StreamIteratorType streamIteratorType = 
getEndpoint().getConfiguration().getStreamIteratorType();
+            currentShardIterators = 
getCurrentShardIterators(streamIteratorType);
+        } else {
+            Map<String, String> childShardIterators = new HashMap<>();
+            for (Entry<String, String> currentShardIterator : 
currentShardIterators.entrySet()) {
+                List<Shard> children = 
shardTree.getChildren(currentShardIterator.getKey());
+                if (children.isEmpty()) { // This is still an active leaf 
shard, reuse it.
+                    childShardIterators.put(currentShardIterator.getKey(), 
currentShardIterator.getValue());
+                } else {
+                    for (Shard child : children) { // Inactive shard, move 
down to its children.
+                        String shardIterator = 
getShardIterator(child.shardId(), ShardIteratorType.TRIM_HORIZON);
+                        childShardIterators.put(child.shardId(), 
shardIterator);
+                    }
+                }
             }
-            shardList.removeOlderThan(currentShard);
-            LOG.trace("Next shard is: {} (in {})", currentShard, shardList);
+            currentShardIterators = childShardIterators;
+        }
+        LOG.trace("Shard Iterators are: {}", currentShardIterators);
+        return currentShardIterators;
+    }
 
-            GetShardIteratorResponse result
-                    = 
getClient().getShardIterator(buildGetShardIteratorRequest(streamArn, 
iteratorType, sequenceNumber));
-            currentShardIterator = result.shardIterator();
+    void updateShardIterator(String shardId, String nextShardIterator) {
+        if (nextShardIterator == null) { // Shard has become inactive and all 
records have been consumed.
+            currentShardIterators.remove(shardId);
+        } else {
+            currentShardIterators.put(shardId, nextShardIterator);
         }
-        LOG.trace("Shard Iterator is: {}", currentShardIterator);
-        return currentShardIterator;
     }
 
-    private GetShardIteratorRequest buildGetShardIteratorRequest(
-            final String streamArn, ShardIteratorType iteratorType, String 
sequenceNumber) {
-        GetShardIteratorRequest.Builder req = 
GetShardIteratorRequest.builder().streamArn(streamArn)
-                
.shardId(currentShard.shardId()).shardIteratorType(iteratorType);
-        switch (iteratorType) {
-            case AFTER_SEQUENCE_NUMBER:
-            case AT_SEQUENCE_NUMBER:
-                // if you request with a sequence number that is LESS than the
-                // start of the shard, you get a HTTP 400 from AWS.
-                // So only add the sequence number if the endpoints
-                // sequence number is less than or equal to the starting
-                // sequence for the shard.
-                // Otherwise change the shart iterator type to trim_horizon
-                // because we get a 400 when we use one of the
-                // {at,after}_sequence_number iterator types and don't supply
-                // a sequence number.
-                if (BigIntComparisons.Conditions.LTEQ.matches(
-                        new 
BigInteger(currentShard.sequenceNumberRange().startingSequenceNumber()),
-                        new BigInteger(sequenceNumber))) {
-                    req.sequenceNumber(sequenceNumber);
-                } else {
-                    req.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
-                }
-                break;
-            default:
+    String requestFreshShardIterator(String shardId, String 
lastSeenSequenceNumber) {
+        String shardIterator = getShardIterator(shardId, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeenSequenceNumber);
+        currentShardIterators.put(shardId, shardIterator);
+        return shardIterator;
+    }
+
+    Ddb2StreamEndpoint getEndpoint() {
+        return endpoint;
+    }
+
+    private String getStreamArn() {
+        ListStreamsResponse streamsListResult = getClient().listStreams(
+                
ListStreamsRequest.builder().tableName(getEndpoint().getConfiguration().getTableName()).build());
+        if (streamsListResult.streams().isEmpty()) {
+            throw new IllegalArgumentException(
+                    "There is no stream associated with table configured. 
Please create one.");
         }
-        return req.build();
+        return streamsListResult.streams().get(0).streamArn(); // XXX assumes 
there is only one stream
     }
 
-    private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) {
-        switch (type) {
-            case AFTER_SEQUENCE_NUMBER:
-                return shardList.afterSeq(resumeFrom != null ? resumeFrom : 
getEndpoint().getSequenceNumber());
-            case AT_SEQUENCE_NUMBER:
-                return shardList.atSeq(getEndpoint().getSequenceNumber());
-            case TRIM_HORIZON:
-                return shardList.first();
-            case LATEST:
+    private Map<String, String> getCurrentShardIterators(StreamIteratorType 
streamIteratorType) {
+        List<Shard> currentShards;
+        ShardIteratorType shardIteratorType;
+        switch (streamIteratorType) {
+            case FROM_START:
+                currentShards = shardTree.getRoots();
+                shardIteratorType = ShardIteratorType.TRIM_HORIZON;
+                break;
+            case FROM_LATEST:
             default:
-                return shardList.last();
+                currentShards = shardTree.getLeaves();
+                shardIteratorType = ShardIteratorType.LATEST;
+        }
+
+        Map<String, String> shardIterators = new HashMap<>();
+        for (Shard currentShard : currentShards) {
+            String shardIterator = getShardIterator(currentShard.shardId(), 
shardIteratorType);
+            shardIterators.put(currentShard.shardId(), shardIterator);
         }
+        return shardIterators;
     }
 
-    void updateShardIterator(String nextShardIterator) {
-        this.currentShardIterator = nextShardIterator;
+    private String getShardIterator(String shardId, ShardIteratorType 
shardIteratorType) {
+        return getShardIterator(shardId, shardIteratorType, null);
     }
 
-    Ddb2StreamEndpoint getEndpoint() {
-        return endpoint;
+    private String getShardIterator(String shardId, ShardIteratorType 
shardIteratorType, String lastSeenSequenceNumber) {
+        GetShardIteratorRequest request = GetShardIteratorRequest.builder()
+                .streamArn(streamArn)
+                .shardId(shardId)
+                .shardIteratorType(shardIteratorType)
+                .sequenceNumber(lastSeenSequenceNumber)
+                .build();
+        return getClient().getShardIterator(request).shardIterator();
     }
 
     private DynamoDbStreamsClient getClient() {
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardTree.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardTree.java
new file mode 100644
index 0000000..97821cc
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardTree.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+class ShardTree {
+
+    private final Map<String, Shard> shards = new HashMap<>();
+
+    void populate(Collection<Shard> shards) {
+        this.shards.clear();
+        for (Shard shard : shards) {
+            this.shards.put(shard.shardId(), shard);
+        }
+    }
+
+    List<Shard> getRoots() {
+        List<Shard> roots = shards.values()
+                .stream()
+                .filter(s -> !shards.containsKey(s.parentShardId()))
+                .collect(Collectors.toList());
+        if (roots.isEmpty()) {
+            throw new IllegalStateException("Unable to find an unparented 
shard in " + shards);
+        }
+        return roots;
+    }
+
+    List<Shard> getLeaves() {
+        return shards.values()
+                .stream()
+                .filter(s -> s.sequenceNumberRange().endingSequenceNumber() == 
null)
+                .collect(Collectors.toList());
+    }
+
+    List<Shard> getChildren(String shardId) {
+        return shards.values()
+                .stream()
+                .filter(s -> shardId.equals(s.parentShardId()))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public String toString() {
+        return "ShardList{" + "shards=" + shards + '}';
+    }
+
+}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java
deleted file mode 100644
index 0178d1a..0000000
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StringSequenceNumberConverter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.ddbstream;
-
-import org.apache.camel.Converter;
-
-// Allow to ignore this type converter if the ddbstream JARs are not present 
on the classpath
-@Converter(generateLoader = true, ignoreOnLoadError = true)
-public final class StringSequenceNumberConverter {
-
-    private StringSequenceNumberConverter() {
-    }
-
-    @Converter
-    public static SequenceNumberProvider toSequenceNumberProvider(String 
sequenceNumber) {
-        return new StaticSequenceNumberProvider(sequenceNumber);
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java
 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamlessClientMock.java
similarity index 60%
rename from 
components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java
rename to 
components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamlessClientMock.java
index bdf90d5..0cf72f6 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/StaticSequenceNumberProvider.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamlessClientMock.java
@@ -16,16 +16,24 @@
  */
 package org.apache.camel.component.aws2.ddbstream;
 
-public class StaticSequenceNumberProvider implements SequenceNumberProvider {
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 
-    private final String sequenceNumber;
+class AmazonDDBStreamlessClientMock implements DynamoDbStreamsClient {
 
-    public StaticSequenceNumberProvider(String sequenceNumber) {
-        this.sequenceNumber = sequenceNumber;
+    @Override
+    public ListStreamsResponse listStreams(ListStreamsRequest 
listStreamsRequest) {
+        return ListStreamsResponse.builder().build();
     }
 
     @Override
-    public String getSequenceNumber() {
-        return sequenceNumber;
+    public String serviceName() {
+        return DynamoDbStreamsClient.SERVICE_NAME;
     }
+
+    @Override
+    public void close() {
+    }
+
 }
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamsClientMock.java
 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamsClientMock.java
new file mode 100644
index 0000000..b59d31e
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/AmazonDDBStreamsClientMock.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.Stream;
+import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import static 
org.apache.camel.component.aws2.ddbstream.ShardFixtures.STREAM_ARN;
+
+class AmazonDDBStreamsClientMock implements DynamoDbStreamsClient {
+
+    private final Map<Shard, String> shardsToIterators = new HashMap<>();
+
+    @Override
+    public ListStreamsResponse listStreams(ListStreamsRequest 
listStreamsRequest) {
+        return 
ListStreamsResponse.builder().streams(Stream.builder().streamArn(STREAM_ARN).build()).build();
+    }
+
+    @Override
+    public DescribeStreamResponse describeStream(DescribeStreamRequest 
describeStreamRequest) {
+        return DescribeStreamResponse.builder()
+                
.streamDescription(StreamDescription.builder().shards(shardsToIterators.keySet()).build()).build();
+    }
+
+    @Override
+    public GetShardIteratorResponse getShardIterator(GetShardIteratorRequest 
request) {
+        String shardIterator = shardsToIterators.entrySet().stream()
+                .filter(s -> s.getKey().shardId().equals(request.shardId()))
+                .map(Entry::getValue)
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("No mocked 
reponse was configured for " + request.shardId()));
+        return 
GetShardIteratorResponse.builder().shardIterator(shardIterator).build();
+    }
+
+    @Override
+    public String serviceName() {
+        return DynamoDbStreamsClient.SERVICE_NAME;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    void setMockedShardAndIteratorResponse(Shard shard, String iterator) {
+        shardsToIterators.put(shard, iterator);
+    }
+}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardFixtures.java
 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardFixtures.java
new file mode 100644
index 0000000..5e7a3fb
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardFixtures.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+final class ShardFixtures {
+
+    public static final String STREAM_ARN = 
"arn:aws:dynamodb:eu-west-1:1234:table/some-table/stream/2021-05-07T09:03:40.295";
+
+    //
+    //               shard 0
+    //             /        \
+    //       shard 1         shard 2
+    //      /      \        /       \
+    // shard 3   shard 4  shard 5   shard 6
+    //
+    public static final Shard SHARD_0 = Shard.builder()
+            .shardId("SHARD_0")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("4100000000019118544662")
+                    .endingSequenceNumber("4100000000019118559460").build())
+            .build();
+    public static final Shard SHARD_1 = Shard.builder()
+            .shardId("SHARD_1")
+            .parentShardId("SHARD_0")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("52700000000007125283545")
+                    .endingSequenceNumber("52700000000007125283545").build())
+            .build();
+    public static final Shard SHARD_2 = Shard.builder()
+            .shardId("SHARD_2")
+            .parentShardId("SHARD_0")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("52700000000020262580503")
+                    .endingSequenceNumber("52700000000020262580503").build())
+            .build();
+    public static final Shard SHARD_3 = Shard.builder()
+            .shardId("SHARD_3")
+            .parentShardId("SHARD_1")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("74100000000005516202603").build())
+            .build();
+    public static final Shard SHARD_4 = Shard.builder()
+            .shardId("SHARD_4")
+            .parentShardId("SHARD_1")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("74100000000044018483325").build())
+            .build();
+    public static final Shard SHARD_5 = Shard.builder()
+            .shardId("SHARD_5")
+            .parentShardId("SHARD_2")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("105800000000033207048658").build())
+            .build();
+    public static final Shard SHARD_6 = Shard.builder()
+            .shardId("SHARD_6")
+            .parentShardId("SHARD_2")
+            
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("105800000000025199618049").build())
+            .build();
+
+    private ShardFixtures() {
+        // Utility class, not called.
+    }
+}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandlerTest.java
 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandlerTest.java
new file mode 100644
index 0000000..7aebb2b
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandlerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import 
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.aws2.ddbstream.ShardFixtures.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShardIteratorHandlerTest extends CamelTestSupport {
+
+    private static final String SHARD_ITERATOR_0 = STREAM_ARN + "|1|hash-0";
+    private static final String SHARD_ITERATOR_1 = STREAM_ARN + "|1|hash-1";
+    private static final String SHARD_ITERATOR_2 = STREAM_ARN + "|1|hash-2";
+    private static final String SHARD_ITERATOR_3 = STREAM_ARN + "|1|hash-3";
+    private static final String SHARD_ITERATOR_4 = STREAM_ARN + "|1|hash-4";
+    private static final String SHARD_ITERATOR_5 = STREAM_ARN + "|1|hash-5";
+    private static final String SHARD_ITERATOR_6 = STREAM_ARN + "|1|hash-6";
+
+    private Ddb2StreamComponent component;
+    private AmazonDDBStreamsClientMock dynamoDbStreamsClient;
+
+    @BeforeEach
+    void setup() throws Exception {
+        component = context.getComponent("aws2-ddbstream", 
Ddb2StreamComponent.class);
+        dynamoDbStreamsClient = new AmazonDDBStreamsClientMock();
+        
component.getConfiguration().setAmazonDynamoDbStreamsClient(dynamoDbStreamsClient);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_0, 
SHARD_ITERATOR_0);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_1, 
SHARD_ITERATOR_1);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_2, 
SHARD_ITERATOR_2);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_3, 
SHARD_ITERATOR_3);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_4, 
SHARD_ITERATOR_4);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_5, 
SHARD_ITERATOR_5);
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_6, 
SHARD_ITERATOR_6);
+    }
+
+    @Test
+    void shouldReturnLeafShardIterators() throws Exception {
+        
component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        Map<String, String> expectedShardIterators = new HashMap<>();
+        expectedShardIterators.put(SHARD_3.shardId(), SHARD_ITERATOR_3);
+        expectedShardIterators.put(SHARD_4.shardId(), SHARD_ITERATOR_4);
+        expectedShardIterators.put(SHARD_5.shardId(), SHARD_ITERATOR_5);
+        expectedShardIterators.put(SHARD_6.shardId(), SHARD_ITERATOR_6);
+        assertEquals(expectedShardIterators, underTest.getShardIterators());
+    }
+
+    @Test
+    void shouldReturnRootShardIterator() throws Exception {
+        
component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_START);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        assertEquals(Collections.singletonMap(SHARD_0.shardId(), 
SHARD_ITERATOR_0), underTest.getShardIterators());
+    }
+
+    @Test
+    void shouldProgressThroughTreeWhenShardIteratorsAreRetrievedRepeatedly() 
throws Exception {
+        
component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_START);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        assertEquals(Collections.singletonMap(SHARD_0.shardId(), 
SHARD_ITERATOR_0), underTest.getShardIterators());
+        Map<String, String> expectedShardIterators1 = new HashMap<>();
+        expectedShardIterators1.put(SHARD_1.shardId(), SHARD_ITERATOR_1);
+        expectedShardIterators1.put(SHARD_2.shardId(), SHARD_ITERATOR_2);
+        assertEquals(expectedShardIterators1, underTest.getShardIterators());
+        Map<String, String> expectedShardIterators2 = new HashMap<>();
+        expectedShardIterators2.put(SHARD_3.shardId(), SHARD_ITERATOR_3);
+        expectedShardIterators2.put(SHARD_4.shardId(), SHARD_ITERATOR_4);
+        expectedShardIterators2.put(SHARD_5.shardId(), SHARD_ITERATOR_5);
+        expectedShardIterators2.put(SHARD_6.shardId(), SHARD_ITERATOR_6);
+        assertEquals(expectedShardIterators2, underTest.getShardIterators());
+        Map<String, String> expectedShardIterators3 = expectedShardIterators2;
+        assertEquals(expectedShardIterators3, underTest.getShardIterators());
+    }
+
+    @Test
+    void shouldUpdateShardIterator() throws Exception {
+        
component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        underTest.getShardIterators();
+        String updatedShardIterator5 = STREAM_ARN + "|1|hash-5-new";
+        underTest.updateShardIterator(SHARD_5.shardId(), 
updatedShardIterator5);
+
+        assertEquals(updatedShardIterator5, 
underTest.getShardIterators().get(SHARD_5.shardId()));
+    }
+
+    @Test
+    void shouldRemoveShardIfNullIteratorIsProvided() throws Exception {
+        
component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        underTest.getShardIterators();
+        underTest.updateShardIterator(SHARD_3.shardId(), null);
+
+        
assertFalse(underTest.getShardIterators().containsKey(SHARD_3.shardId()));
+    }
+
+    @Test
+    void shouldRequestAndCacheFreshShardIterator() throws Exception {
+        
component.getConfiguration().setStreamIteratorType(StreamIteratorType.FROM_LATEST);
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        underTest.getShardIterators();
+        String freshShardIterator4 = STREAM_ARN + "|1|hash-4-fresh";
+        dynamoDbStreamsClient.setMockedShardAndIteratorResponse(SHARD_4, 
freshShardIterator4);
+
+        assertEquals(freshShardIterator4, 
underTest.requestFreshShardIterator(SHARD_4.shardId(), freshShardIterator4));
+        assertEquals(freshShardIterator4, 
underTest.getShardIterators().get(SHARD_4.shardId()));
+    }
+
+    @Test
+    void shouldThrowIllegalArgumentExceptionIfNoStreamsAreReturned() throws 
Exception {
+        AmazonDDBStreamlessClientMock dynamoDbStreamsClient = new 
AmazonDDBStreamlessClientMock();
+        
component.getConfiguration().setAmazonDynamoDbStreamsClient(dynamoDbStreamsClient);
+
+        Ddb2StreamEndpoint endpoint = (Ddb2StreamEndpoint) 
component.createEndpoint("aws2-ddbstreams://myTable");
+        ShardIteratorHandler underTest = new ShardIteratorHandler(endpoint);
+        endpoint.doStart();
+
+        assertThrows(IllegalArgumentException.class, () -> 
underTest.getShardIterators());
+    }
+
+}
diff --git 
a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardTreeTest.java
 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardTreeTest.java
new file mode 100644
index 0000000..f2a84a4
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/ShardTreeTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.ddbstream;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import static org.apache.camel.component.aws2.ddbstream.ShardFixtures.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShardTreeTest {
+
+    private final ShardTree underTest = new ShardTree();
+
+    @Test
+    void shouldGetLeafShards() {
+        underTest.populate(Arrays.asList(SHARD_0, SHARD_1, SHARD_2, SHARD_3, 
SHARD_4, SHARD_5, SHARD_6));
+
+        assertEquals(Arrays.asList(SHARD_5, SHARD_6, SHARD_3, SHARD_4), 
underTest.getLeaves());
+    }
+
+    @Test
+    void shouldReturnEmptyListIfAllShardsHaveAnEndingSequenceNumber() {
+        underTest.populate(Arrays.asList(SHARD_0, SHARD_1, SHARD_2));
+
+        assertEquals(Arrays.asList(), underTest.getLeaves());
+    }
+
+    @Test
+    void shouldGetRootShards() {
+        underTest.populate(Arrays.asList(SHARD_1, SHARD_2, SHARD_3, SHARD_4, 
SHARD_5, SHARD_6));
+
+        assertEquals(Arrays.asList(SHARD_1, SHARD_2), underTest.getRoots());
+    }
+
+    @Test
+    void shouldThrowIfNoUnparentedShardsCanBeFound() {
+        Shard selfParentingShard = 
Shard.builder().shardId("SHARD_X").parentShardId("SHARD_X").build();
+        underTest.populate(Arrays.asList(selfParentingShard));
+
+        assertThrows(IllegalStateException.class, () -> underTest.getRoots());
+    }
+
+    @Test
+    void shouldGetChildShards() {
+        underTest.populate(Arrays.asList(SHARD_0, SHARD_1, SHARD_2, SHARD_3, 
SHARD_4, SHARD_5, SHARD_6));
+
+        assertEquals(Arrays.asList(SHARD_5, SHARD_6), 
underTest.getChildren("SHARD_2"));
+    }
+
+    @Test
+    void shouldReturnEmptyListIfTheShardHasNoChildren() {
+        underTest.populate(Arrays.asList(SHARD_6));
+
+        assertEquals(Arrays.asList(), underTest.getChildren("SHARD_6"));
+    }
+
+}
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java
index 9242576..3d7eb3a 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2DdbstreamComponentBuilderFactory.java
@@ -106,26 +106,6 @@ public interface Aws2DdbstreamComponentBuilderFactory {
             return this;
         }
         /**
-         * Defines where in the DynaboDB stream to start getting records. Note
-         * that using TRIM_HORIZON can cause a significant delay before the
-         * stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are
-         * used, then a sequenceNumberProvider MUST be supplied.
-         * 
-         * The option is a:
-         * 
&lt;code&gt;software.amazon.awssdk.services.dynamodb.model.ShardIteratorType&lt;/code&gt;
 type.
-         * 
-         * Default: LATEST
-         * Group: consumer
-         * 
-         * @param iteratorType the value to set
-         * @return the dsl builder
-         */
-        default Aws2DdbstreamComponentBuilder iteratorType(
-                
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType iteratorType) {
-            doSetProperty("iteratorType", iteratorType);
-            return this;
-        }
-        /**
          * Maximum number of records that will be fetched in each poll.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -219,21 +199,22 @@ public interface Aws2DdbstreamComponentBuilderFactory {
             return this;
         }
         /**
-         * Provider for the sequence number when using one of the two
-         * ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be 
a
-         * registry reference or a literal sequence number.
+         * Defines where in the DynamoDB stream to start getting records. Note
+         * that using FROM_START can cause a significant delay before the 
stream
+         * has caught up to real-time.
          * 
          * The option is a:
-         * 
&lt;code&gt;org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType&lt;/code&gt;
 type.
          * 
+         * Default: FROM_LATEST
          * Group: consumer
          * 
-         * @param sequenceNumberProvider the value to set
+         * @param streamIteratorType the value to set
          * @return the dsl builder
          */
-        default Aws2DdbstreamComponentBuilder sequenceNumberProvider(
-                
org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider 
sequenceNumberProvider) {
-            doSetProperty("sequenceNumberProvider", sequenceNumberProvider);
+        default Aws2DdbstreamComponentBuilder streamIteratorType(
+                
org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType
 streamIteratorType) {
+            doSetProperty("streamIteratorType", streamIteratorType);
             return this;
         }
         /**
@@ -365,14 +346,13 @@ public interface Aws2DdbstreamComponentBuilderFactory {
             case "amazonDynamoDbStreamsClient": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setAmazonDynamoDbStreamsClient((software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient)
 value); return true;
             case "bridgeErrorHandler": ((Ddb2StreamComponent) 
component).setBridgeErrorHandler((boolean) value); return true;
             case "configuration": ((Ddb2StreamComponent) 
component).setConfiguration((org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration)
 value); return true;
-            case "iteratorType": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setIteratorType((software.amazon.awssdk.services.dynamodb.model.ShardIteratorType)
 value); return true;
             case "maxResultsPerRequest": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setMaxResultsPerRequest((int) value); return true;
             case "overrideEndpoint": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setOverrideEndpoint((boolean) value); return true;
             case "proxyHost": getOrCreateConfiguration((Ddb2StreamComponent) 
component).setProxyHost((java.lang.String) value); return true;
             case "proxyPort": getOrCreateConfiguration((Ddb2StreamComponent) 
component).setProxyPort((java.lang.Integer) value); return true;
             case "proxyProtocol": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setProxyProtocol((software.amazon.awssdk.core.Protocol) value); 
return true;
             case "region": getOrCreateConfiguration((Ddb2StreamComponent) 
component).setRegion((java.lang.String) value); return true;
-            case "sequenceNumberProvider": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setSequenceNumberProvider((org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider)
 value); return true;
+            case "streamIteratorType": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setStreamIteratorType((org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration.StreamIteratorType)
 value); return true;
             case "trustAllCertificates": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setTrustAllCertificates((boolean) value); return true;
             case "uriEndpointOverride": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setUriEndpointOverride((java.lang.String) value); return true;
             case "useDefaultCredentialsProvider": 
getOrCreateConfiguration((Ddb2StreamComponent) 
component).setUseDefaultCredentialsProvider((boolean) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java
index 7b2a3d5..471ed16 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Ddb2StreamEndpointBuilderFactory.java
@@ -124,45 +124,6 @@ public interface Ddb2StreamEndpointBuilderFactory {
             return this;
         }
         /**
-         * Defines where in the DynaboDB stream to start getting records. Note
-         * that using TRIM_HORIZON can cause a significant delay before the
-         * stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are
-         * used, then a sequenceNumberProvider MUST be supplied.
-         * 
-         * The option is a:
-         * 
&lt;code&gt;software.amazon.awssdk.services.dynamodb.model.ShardIteratorType&lt;/code&gt;
 type.
-         * 
-         * Default: LATEST
-         * Group: consumer
-         * 
-         * @param iteratorType the value to set
-         * @return the dsl builder
-         */
-        default Ddb2StreamEndpointBuilder iteratorType(
-                ShardIteratorType iteratorType) {
-            doSetProperty("iteratorType", iteratorType);
-            return this;
-        }
-        /**
-         * Defines where in the DynaboDB stream to start getting records. Note
-         * that using TRIM_HORIZON can cause a significant delay before the
-         * stream has caught up to real-time. if {AT,AFTER}_SEQUENCE_NUMBER are
-         * used, then a sequenceNumberProvider MUST be supplied.
-         * 
-         * The option will be converted to a
-         * 
&lt;code&gt;software.amazon.awssdk.services.dynamodb.model.ShardIteratorType&lt;/code&gt;
 type.
-         * 
-         * Default: LATEST
-         * Group: consumer
-         * 
-         * @param iteratorType the value to set
-         * @return the dsl builder
-         */
-        default Ddb2StreamEndpointBuilder iteratorType(String iteratorType) {
-            doSetProperty("iteratorType", iteratorType);
-            return this;
-        }
-        /**
          * Maximum number of records that will be fetched in each poll.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -352,39 +313,41 @@ public interface Ddb2StreamEndpointBuilderFactory {
             return this;
         }
         /**
-         * Provider for the sequence number when using one of the two
-         * ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be 
a
-         * registry reference or a literal sequence number.
+         * Defines where in the DynamoDB stream to start getting records. Note
+         * that using FROM_START can cause a significant delay before the 
stream
+         * has caught up to real-time.
          * 
          * The option is a:
-         * 
&lt;code&gt;org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration$StreamIteratorType&lt;/code&gt;
 type.
          * 
+         * Default: FROM_LATEST
          * Group: consumer
          * 
-         * @param sequenceNumberProvider the value to set
+         * @param streamIteratorType the value to set
          * @return the dsl builder
          */
-        default Ddb2StreamEndpointBuilder sequenceNumberProvider(
-                Object sequenceNumberProvider) {
-            doSetProperty("sequenceNumberProvider", sequenceNumberProvider);
+        default Ddb2StreamEndpointBuilder streamIteratorType(
+                StreamIteratorType streamIteratorType) {
+            doSetProperty("streamIteratorType", streamIteratorType);
             return this;
         }
         /**
-         * Provider for the sequence number when using one of the two
-         * ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER iterator types. Can be 
a
-         * registry reference or a literal sequence number.
+         * Defines where in the DynamoDB stream to start getting records. Note
+         * that using FROM_START can cause a significant delay before the 
stream
+         * has caught up to real-time.
          * 
          * The option will be converted to a
-         * 
&lt;code&gt;org.apache.camel.component.aws2.ddbstream.SequenceNumberProvider&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration$StreamIteratorType&lt;/code&gt;
 type.
          * 
+         * Default: FROM_LATEST
          * Group: consumer
          * 
-         * @param sequenceNumberProvider the value to set
+         * @param streamIteratorType the value to set
          * @return the dsl builder
          */
-        default Ddb2StreamEndpointBuilder sequenceNumberProvider(
-                String sequenceNumberProvider) {
-            doSetProperty("sequenceNumberProvider", sequenceNumberProvider);
+        default Ddb2StreamEndpointBuilder streamIteratorType(
+                String streamIteratorType) {
+            doSetProperty("streamIteratorType", streamIteratorType);
             return this;
         }
         /**
@@ -1097,18 +1060,6 @@ public interface Ddb2StreamEndpointBuilderFactory {
     }
 
     /**
-     * Proxy enum for
-     * 
<code>software.amazon.awssdk.services.dynamodb.model.ShardIteratorType</code> 
enum.
-     */
-    enum ShardIteratorType {
-        TRIM_HORIZON,
-        LATEST,
-        AT_SEQUENCE_NUMBER,
-        AFTER_SEQUENCE_NUMBER,
-        UNKNOWN_TO_SDK_VERSION;
-    }
-
-    /**
      * Proxy enum for <code>software.amazon.awssdk.core.Protocol</code> enum.
      */
     enum Protocol {
@@ -1116,6 +1067,15 @@ public interface Ddb2StreamEndpointBuilderFactory {
         HTTPS;
     }
 
+    /**
+     * Proxy enum for
+     * 
<code>org.apache.camel.component.aws2.ddbstream.Ddb2StreamConfiguration$StreamIteratorType</code>
 enum.
+     */
+    enum StreamIteratorType {
+        FROM_LATEST,
+        FROM_START;
+    }
+
     public interface Ddb2StreamBuilders {
         /**
          * AWS DynamoDB Streams (camel-aws2-ddb)

Reply via email to