This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 46e003c436c CAMEL-20840: fixed incomplete/incorrect implementation of 
resume adapter
46e003c436c is described below

commit 46e003c436c6d90fa765403a3b512ed1c73c0168
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Tue Jun 11 11:26:40 2024 +0200

    CAMEL-20840: fixed incomplete/incorrect implementation of resume adapter
    
    - removed invalid test
    - adjust Kinesis resume adapter to use the ResumeActionAware
    - simplify Kinesis resume adapter
---
 .../camel/catalog/components/aws2-kinesis.json     |   3 +-
 .../camel/component/aws2/kinesis/aws2-kinesis.json |   3 +-
 .../services/org/apache/camel/adapter-factory      |   2 -
 .../component/aws2/kinesis/Kinesis2Constants.java  |   3 +
 .../component/aws2/kinesis/Kinesis2Consumer.java   |  25 ++++-
 .../consumer/KinesisDefaultResumeAdapter.java      |  81 ----------------
 .../aws2/kinesis/consumer/KinesisResumeAction.java |  68 +++++++++++++
 .../kinesis/consumer/KinesisResumeAdapter.java     |  33 -------
 .../KinesisConsumerResumeAfterRestartIT.java       | 106 ---------------------
 .../integration/KinesisConsumerResumeIT.java       |  37 +++----
 .../support/resume/ResumeActionAwareAdapter.java   |   4 +
 .../dsl/Kinesis2EndpointBuilderFactory.java        |  12 +++
 12 files changed, 134 insertions(+), 243 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
index 8f1a0065ed7..2c45648f61e 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
@@ -58,7 +58,8 @@
     "CamelAwsKinesisApproximateArrivalTimestamp": { "index": 1, "kind": 
"header", "displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The time AWS assigned as the arrival 
time of the record.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#APPROX_ARRIVAL_TIME" 
},
     "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Identifies which shard in the stream 
the data record is assigned to.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" },
     "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "long", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" },
-    "CamelAwsKinesisShardId": { "index": 4, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
+    "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The resume action to execute when 
resuming.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" },
+    "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
   },
   "properties": {
     "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", 
"group": "common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Name of the stream" },
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index 8f1a0065ed7..2c45648f61e 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -58,7 +58,8 @@
     "CamelAwsKinesisApproximateArrivalTimestamp": { "index": 1, "kind": 
"header", "displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The time AWS assigned as the arrival 
time of the record.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#APPROX_ARRIVAL_TIME" 
},
     "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Identifies which shard in the stream 
the data record is assigned to.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" },
     "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "long", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" },
-    "CamelAwsKinesisShardId": { "index": 4, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
+    "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The resume action to execute when 
resuming.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" },
+    "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
   },
   "properties": {
     "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", 
"group": "common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Name of the stream" },
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory
deleted file mode 100644
index 0e7c6193aee..00000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory
+++ /dev/null
@@ -1,2 +0,0 @@
-# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.aws2.kinesis.consumer.KinesisDefaultResumeAdapter
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
index 62abfc4d5d3..78b6db63a1d 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
@@ -32,6 +32,9 @@ public interface Kinesis2Constants {
     @Metadata(description = "The timestamp of the message", javaType = "long")
     String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP;
 
+    @Metadata(label = "consumer", description = "The resume action to execute 
when resuming.", javaType = "String")
+    String RESUME_ACTION = "CamelKinesisDbResumeAction";
+
     /**
      * in a Kinesis Record object, the shard ID is used on writes to indicate 
where the data was stored
      */
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 9c5c86eb0d6..2b94b9da034 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -29,7 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAction;
+import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
@@ -287,14 +289,31 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
             return;
         }
 
-        KinesisResumeAdapter adapter = 
resumeStrategy.getAdapter(KinesisResumeAdapter.class);
+        ResumeActionAware adapter = 
resumeStrategy.getAdapter(ResumeActionAware.class);
         if (adapter == null) {
             LOG.warn("There is a resume strategy setup, but no adapter 
configured or the type is incorrect");
 
             return;
         }
 
-        adapter.configureGetShardIteratorRequest(req, 
getEndpoint().getConfiguration().getStreamName(), shardId);
+        final ResumeAction action = resolveResumeAction(shardId, req);
+        adapter.setResumeAction(action);
+        adapter.resume();
+    }
+
+    private KinesisResumeAction resolveResumeAction(String shardId, 
GetShardIteratorRequest.Builder req) {
+        KinesisResumeAction action
+                = 
getEndpoint().getCamelContext().getRegistry().lookupByNameAndType(Kinesis2Constants.RESUME_ACTION,
+                        KinesisResumeAction.class);
+        if (action == null) {
+            action = new KinesisResumeAction(req);
+        } else {
+            action.setBuilder(req);
+        }
+
+        action.setShardId(shardId);
+        action.setStreamName(getEndpoint().getConfiguration().getStreamName());
+        return action;
     }
 
     private Queue<Exchange> createExchanges(Shard shard, List<Record> records) 
{
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
deleted file mode 100644
index 8a1c47a2cf3..00000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import org.apache.camel.resume.Cacheable;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.resume.cache.ResumeCache;
-import org.apache.camel.spi.annotations.JdkService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-
-@JdkService(ResumeAdapter.RESUME_ADAPTER_FACTORY)
-public class KinesisDefaultResumeAdapter implements KinesisResumeAdapter, 
Cacheable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDefaultResumeAdapter.class);
-
-    private ResumeCache<String> cache;
-
-    @Override
-    public void resume() {
-        throw new UnsupportedOperationException();
-    }
-
-    private void add(Object key, Object offset) {
-        KinesisOffset ko = (KinesisOffset) cache.computeIfAbsent((String) key, 
k -> new KinesisOffset());
-
-        ko.update((String) offset);
-    }
-
-    @Override
-    public boolean add(OffsetKey<?> key, Offset<?> offset) {
-        add(key.getValue(), offset.getValue());
-
-        return true;
-    }
-
-    @Override
-    public void setCache(ResumeCache<?> cache) {
-        this.cache = (ResumeCache<String>) cache;
-    }
-
-    @Override
-    public ResumeCache<?> getCache() {
-        return cache;
-    }
-
-    @Override
-    public void 
configureGetShardIteratorRequest(GetShardIteratorRequest.Builder builder, 
String streamName, String shardId) {
-        KinesisOffset offset = cache.get(shardId, KinesisOffset.class);
-
-        if (offset == null) {
-            LOG.info("There is no offset for the stream {}", streamName);
-            return;
-        }
-
-        final String sequenceNumber = offset.getValue();
-        LOG.info("Resuming from offset {} for key {}", sequenceNumber, 
streamName);
-
-        builder.shardId(shardId);
-        builder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
-        builder.startingSequenceNumber(sequenceNumber);
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java
new file mode 100644
index 00000000000..11dcab57ae7
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import org.apache.camel.resume.ResumeAction;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+public class KinesisResumeAction implements ResumeAction {
+
+    private GetShardIteratorRequest.Builder builder;
+    private String streamName;
+    private String shardId;
+
+    public KinesisResumeAction() {
+    }
+
+    public KinesisResumeAction(GetShardIteratorRequest.Builder builder) {
+        this.builder = builder;
+    }
+
+    public void setBuilder(GetShardIteratorRequest.Builder builder) {
+        this.builder = builder;
+    }
+
+    protected GetShardIteratorRequest.Builder getBuilder() {
+        return builder;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public void setStreamName(String streamName) {
+        this.streamName = streamName;
+    }
+
+    public String getShardId() {
+        return shardId;
+    }
+
+    public void setShardId(String shardId) {
+        this.shardId = shardId;
+    }
+
+    @Override
+    public boolean evalEntry(Object shardId, Object sequenceNumber) {
+        builder.shardId((String) shardId);
+        builder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+        builder.startingSequenceNumber((String) sequenceNumber);
+        return false;
+    }
+}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
deleted file mode 100644
index a9f031f1866..00000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import org.apache.camel.resume.ResumeAdapter;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-
-/**
- * The resume adapter for Kinesis
- */
-public interface KinesisResumeAdapter extends ResumeAdapter {
-    /*
-        When consuming from multiple shards the KinesisResumeAdapter is 
potentially accessed by multiple threads.
-        To avoid any concurrency issues the configuration of the 
GetShardIteratorRequest should be done in one operation
-        and not using multiple calls like in the previous version of this 
interface
-     */
-    void configureGetShardIteratorRequest(GetShardIteratorRequest.Builder 
builder, String streamName, String shardId);
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java
 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java
deleted file mode 100644
index 41a9b10efbf..00000000000
--- 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.integration;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.camel.builder.RouteBuilder;
-import 
org.apache.camel.component.aws2.kinesis.consumer.KinesisConsumerOffsetProcessor;
-import 
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategyConfiguration;
-import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.cache.ResumeCache;
-import org.apache.camel.test.infra.aws.common.services.AWSService;
-import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
-import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
-
-import static 
org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class KinesisConsumerResumeAfterRestartIT extends CamelTestSupport {
-
-    @RegisterExtension
-    public static AWSService awsService = 
AWSServiceFactory.createSingletonKinesisService();
-    private KinesisClient client;
-
-    String streamName = "my-stream";
-
-    List<String> receivedMessages = new CopyOnWriteArrayList<>();
-
-    ResumeCache resumeCache = TransientResumeStrategy.createSimpleCache();
-
-    @Override
-    protected RouteBuilder createRouteBuilder() {
-        client = AWSSDKClientUtils.newKinesisClient();
-
-        context.getRegistry().bind("amazonKinesisClient", 
AWSSDKClientUtils.newKinesisClient());
-
-        return new RouteBuilder() {
-            @Override
-            public void configure() {
-                String kinesisEndpointUri = 
"aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
-
-                fromF(kinesisEndpointUri, streamName)
-                        // commenting out the strategy will cause the test to 
fail as event "First" will be consumed twice
-                        
.resumable().configuration(KinesisResumeStrategyConfiguration.builder().withResumeCache(resumeCache))
-                        .process(new KinesisConsumerOffsetProcessor())
-                        .process(exchange -> 
receivedMessages.add(exchange.getMessage().getBody(String.class)));
-            }
-        };
-    }
-
-    @BeforeEach
-    public void prepareEnvironment() {
-        createStream(client, streamName);
-    }
-
-    private void sendEvent(String payload) {
-        
client.putRecord(PutRecordRequest.builder().streamName(streamName).partitionKey("my-key")
-                .data(SdkBytes.fromUtf8String(payload)).build());
-    }
-
-    @Test
-    void shouldResumeConsumptionAfterRestart() {
-
-        sendEvent("First");
-        Awaitility.await().until(() -> receivedMessages.contains("First"));
-
-        restartContext();
-
-        sendEvent("Second");
-        Awaitility.await().until(() -> receivedMessages.contains("Second"));
-
-        assertEquals(2, receivedMessages.size());
-    }
-
-    private void restartContext() {
-        context.stop();
-
-        // stop also seems to close the kinesis client, therefor we need to 
provide a new one
-        context.getRegistry().bind("amazonKinesisClient", 
AWSSDKClientUtils.newKinesisClient());
-
-        context.start();
-    }
-}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 1868e7ad251..ce34c29bee4 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,9 +25,11 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAction;
+import 
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategyConfiguration;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -76,29 +78,26 @@ public class KinesisConsumerResumeIT extends 
CamelTestSupport {
         }
     }
 
-    private static final class TestKinesisResumeAdapter implements 
KinesisResumeAdapter {
+    private static final class TestResumeAction extends KinesisResumeAction {
         private List<PutRecordsResponse> previousRecords;
         private final int expectedCount;
-        private GetShardIteratorRequest.Builder builder;
 
-        private TestKinesisResumeAdapter(int expectedCount) {
+        private TestResumeAction(int expectedCount) {
             this.expectedCount = expectedCount;
         }
 
-        @Override
-        public void resume() {
-        }
-
         public void setPreviousRecords(List<PutRecordsResponse> 
previousRecords) {
             this.previousRecords = previousRecords;
         }
 
+        public int getExpectedCount() {
+            return expectedCount;
+        }
+
         @Override
-        public void configureGetShardIteratorRequest(
-                GetShardIteratorRequest.Builder builder, String streamName, 
String shardId) {
+        public boolean evalEntry(Object shardId, Object sequenceNumber) {
+            final GetShardIteratorRequest.Builder builder = super.getBuilder();
             ObjectHelper.notNull(builder, "builder");
-            ObjectHelper.notNull(streamName, "streamName");
-            ObjectHelper.notNull(shardId, "shardId");
 
             LOG.debug("Waiting for data");
             Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
!previousRecords.isEmpty());
@@ -109,6 +108,7 @@ public class KinesisConsumerResumeIT extends 
CamelTestSupport {
 
             
builder.startingSequenceNumber(putRecordsResultEntry.sequenceNumber());
             builder.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+            return false;
         }
     }
 
@@ -126,7 +126,7 @@ public class KinesisConsumerResumeIT extends 
CamelTestSupport {
     private final int expectedCount = messageCount / 2;
     private List<KinesisData> receivedMessages = new CopyOnWriteArrayList<>();
     private List<PutRecordsResponse> previousRecords;
-    private TestKinesisResumeAdapter adapter = new 
TestKinesisResumeAdapter(expectedCount);
+    private TestResumeAction action = new TestResumeAction(expectedCount);
 
     @Override
     protected RouteBuilder createRouteBuilder() {
@@ -137,11 +137,17 @@ public class KinesisConsumerResumeIT extends 
CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                bindToRegistry("testResumeStrategy", new 
TransientResumeStrategy(adapter));
+                final ResumeCache<Object> simpleCache = 
TransientResumeStrategy.createSimpleCache();
+                final 
KinesisResumeStrategyConfiguration.KinesisResumeStrategyConfigurationBuilder 
resumeConfigurationBuilder
+                        = KinesisResumeStrategyConfiguration.builder()
+                                .withResumeCache(simpleCache);
+
+                bindToRegistry(Kinesis2Constants.RESUME_ACTION, action);
 
                 String kinesisEndpointUri = 
"aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
 
                 fromF(kinesisEndpointUri, streamName)
+                        .resumable().configuration(resumeConfigurationBuilder)
                         .process(exchange -> {
                             KinesisData data = new KinesisData();
                             final Message message = exchange.getMessage();
@@ -153,7 +159,6 @@ public class KinesisConsumerResumeIT extends 
CamelTestSupport {
 
                             receivedMessages.add(data);
                         })
-                        .resumable("testResumeStrategy")
                         .to("mock:result");
             }
         };
@@ -172,7 +177,7 @@ public class KinesisConsumerResumeIT extends 
CamelTestSupport {
             }
         }
 
-        adapter.setPreviousRecords(previousRecords);
+        action.setPreviousRecords(previousRecords);
     }
 
     @AfterEach
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
index 563f64c8889..a273c30ee1d 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
@@ -78,4 +78,8 @@ public class ResumeActionAwareAdapter implements 
ResumeActionAware, Cacheable, D
 
         return add(key, value);
     }
+
+    protected ResumeAction getResumeAction() {
+        return resumeAction;
+    }
 }
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index 5188a698bd9..f11004307de 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -2332,6 +2332,18 @@ public interface Kinesis2EndpointBuilderFactory {
         public String messageTimestamp() {
             return "CamelMessageTimestamp";
         }
+        /**
+         * The resume action to execute when resuming.
+         * 
+         * The option is a: {@code String} type.
+         * 
+         * Group: consumer
+         * 
+         * @return the name of the header {@code KinesisDbResumeAction}.
+         */
+        public String kinesisDbResumeAction() {
+            return "CamelKinesisDbResumeAction";
+        }
         /**
          * The shard ID of the shard where the data record was placed.
          * 

Reply via email to