This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 46e003c436c CAMEL-20840: fixed incomplete/incorrect implementation of resume adapter 46e003c436c is described below commit 46e003c436c6d90fa765403a3b512ed1c73c0168 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Tue Jun 11 11:26:40 2024 +0200 CAMEL-20840: fixed incomplete/incorrect implementation of resume adapter - removed invalid test - adjust Kinesis resume adapter to use the ResumeActionAware - simplify Kinesis resume adapter --- .../camel/catalog/components/aws2-kinesis.json | 3 +- .../camel/component/aws2/kinesis/aws2-kinesis.json | 3 +- .../services/org/apache/camel/adapter-factory | 2 - .../component/aws2/kinesis/Kinesis2Constants.java | 3 + .../component/aws2/kinesis/Kinesis2Consumer.java | 25 ++++- .../consumer/KinesisDefaultResumeAdapter.java | 81 ---------------- .../aws2/kinesis/consumer/KinesisResumeAction.java | 68 +++++++++++++ .../kinesis/consumer/KinesisResumeAdapter.java | 33 ------- .../KinesisConsumerResumeAfterRestartIT.java | 106 --------------------- .../integration/KinesisConsumerResumeIT.java | 37 +++---- .../support/resume/ResumeActionAwareAdapter.java | 4 + .../dsl/Kinesis2EndpointBuilderFactory.java | 12 +++ 12 files changed, 134 insertions(+), 243 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json index 8f1a0065ed7..2c45648f61e 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json @@ -58,7 +58,8 @@ "CamelAwsKinesisApproximateArrivalTimestamp": { "index": 1, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time AWS assigned as the arrival time of the record.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#APPROX_ARRIVAL_TIME" }, "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Identifies which shard in the stream the data record is assigned to.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" }, "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" }, - "CamelAwsKinesisShardId": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" } + "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" }, + "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" } }, "properties": { "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Name of the stream" }, diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json index 8f1a0065ed7..2c45648f61e 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json @@ -58,7 +58,8 @@ "CamelAwsKinesisApproximateArrivalTimestamp": { "index": 1, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time AWS assigned as the arrival time of the record.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#APPROX_ARRIVAL_TIME" }, "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Identifies which shard in the stream the data record is assigned to.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" }, "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" }, - "CamelAwsKinesisShardId": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" } + "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" }, + "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" } }, "properties": { "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Name of the stream" }, diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory deleted file mode 100644 index 0e7c6193aee..00000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory +++ /dev/null @@ -1,2 +0,0 @@ -# Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.component.aws2.kinesis.consumer.KinesisDefaultResumeAdapter diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java index 62abfc4d5d3..78b6db63a1d 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java @@ -32,6 +32,9 @@ public interface Kinesis2Constants { @Metadata(description = "The timestamp of the message", javaType = "long") String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP; + @Metadata(label = "consumer", description = "The resume action to execute when resuming.", javaType = "String") + String RESUME_ACTION = "CamelKinesisDbResumeAction"; + /** * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored */ diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 9c5c86eb0d6..2b94b9da034 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -29,7 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAction; +import org.apache.camel.resume.ResumeAction; +import org.apache.camel.resume.ResumeActionAware; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.ScheduledBatchPollingConsumer; @@ -287,14 +289,31 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R return; } - KinesisResumeAdapter adapter = resumeStrategy.getAdapter(KinesisResumeAdapter.class); + ResumeActionAware adapter = resumeStrategy.getAdapter(ResumeActionAware.class); if (adapter == null) { LOG.warn("There is a resume strategy setup, but no adapter configured or the type is incorrect"); return; } - adapter.configureGetShardIteratorRequest(req, getEndpoint().getConfiguration().getStreamName(), shardId); + final ResumeAction action = resolveResumeAction(shardId, req); + adapter.setResumeAction(action); + adapter.resume(); + } + + private KinesisResumeAction resolveResumeAction(String shardId, GetShardIteratorRequest.Builder req) { + KinesisResumeAction action + = getEndpoint().getCamelContext().getRegistry().lookupByNameAndType(Kinesis2Constants.RESUME_ACTION, + KinesisResumeAction.class); + if (action == null) { + action = new KinesisResumeAction(req); + } else { + action.setBuilder(req); + } + + action.setShardId(shardId); + action.setStreamName(getEndpoint().getConfiguration().getStreamName()); + return action; } private Queue<Exchange> createExchanges(Shard shard, List<Record> records) { diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java deleted file mode 100644 index 8a1c47a2cf3..00000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.aws2.kinesis.consumer; - -import org.apache.camel.resume.Cacheable; -import org.apache.camel.resume.Offset; -import org.apache.camel.resume.OffsetKey; -import org.apache.camel.resume.ResumeAdapter; -import org.apache.camel.resume.cache.ResumeCache; -import org.apache.camel.spi.annotations.JdkService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; - -@JdkService(ResumeAdapter.RESUME_ADAPTER_FACTORY) -public class KinesisDefaultResumeAdapter implements KinesisResumeAdapter, Cacheable { - private static final Logger LOG = LoggerFactory.getLogger(KinesisDefaultResumeAdapter.class); - - private ResumeCache<String> cache; - - @Override - public void resume() { - throw new UnsupportedOperationException(); - } - - private void add(Object key, Object offset) { - KinesisOffset ko = (KinesisOffset) cache.computeIfAbsent((String) key, k -> new KinesisOffset()); - - ko.update((String) offset); - } - - @Override - public boolean add(OffsetKey<?> key, Offset<?> offset) { - add(key.getValue(), offset.getValue()); - - return true; - } - - @Override - public void setCache(ResumeCache<?> cache) { - this.cache = (ResumeCache<String>) cache; - } - - @Override - public ResumeCache<?> getCache() { - return cache; - } - - @Override - public void configureGetShardIteratorRequest(GetShardIteratorRequest.Builder builder, String streamName, String shardId) { - KinesisOffset offset = cache.get(shardId, KinesisOffset.class); - - if (offset == null) { - LOG.info("There is no offset for the stream {}", streamName); - return; - } - - final String sequenceNumber = offset.getValue(); - LOG.info("Resuming from offset {} for key {}", sequenceNumber, streamName); - - builder.shardId(shardId); - builder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); - builder.startingSequenceNumber(sequenceNumber); - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java new file mode 100644 index 00000000000..11dcab57ae7 --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.aws2.kinesis.consumer; + +import org.apache.camel.resume.ResumeAction; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +public class KinesisResumeAction implements ResumeAction { + + private GetShardIteratorRequest.Builder builder; + private String streamName; + private String shardId; + + public KinesisResumeAction() { + } + + public KinesisResumeAction(GetShardIteratorRequest.Builder builder) { + this.builder = builder; + } + + public void setBuilder(GetShardIteratorRequest.Builder builder) { + this.builder = builder; + } + + protected GetShardIteratorRequest.Builder getBuilder() { + return builder; + } + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public String getShardId() { + return shardId; + } + + public void setShardId(String shardId) { + this.shardId = shardId; + } + + @Override + public boolean evalEntry(Object shardId, Object sequenceNumber) { + builder.shardId((String) shardId); + builder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + builder.startingSequenceNumber((String) sequenceNumber); + return false; + } +} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java deleted file mode 100644 index a9f031f1866..00000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.aws2.kinesis.consumer; - -import org.apache.camel.resume.ResumeAdapter; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; - -/** - * The resume adapter for Kinesis - */ -public interface KinesisResumeAdapter extends ResumeAdapter { - /* - When consuming from multiple shards the KinesisResumeAdapter is potentially accessed by multiple threads. - To avoid any concurrency issues the configuration of the GetShardIteratorRequest should be done in one operation - and not using multiple calls like in the previous version of this interface - */ - void configureGetShardIteratorRequest(GetShardIteratorRequest.Builder builder, String streamName, String shardId); -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java deleted file mode 100644 index 41a9b10efbf..00000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis.integration; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.aws2.kinesis.consumer.KinesisConsumerOffsetProcessor; -import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategyConfiguration; -import org.apache.camel.processor.resume.TransientResumeStrategy; -import org.apache.camel.resume.cache.ResumeCache; -import org.apache.camel.test.infra.aws.common.services.AWSService; -import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; -import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; -import org.apache.camel.test.junit5.CamelTestSupport; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; - -import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream; -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class KinesisConsumerResumeAfterRestartIT extends CamelTestSupport { - - @RegisterExtension - public static AWSService awsService = AWSServiceFactory.createSingletonKinesisService(); - private KinesisClient client; - - String streamName = "my-stream"; - - List<String> receivedMessages = new CopyOnWriteArrayList<>(); - - ResumeCache resumeCache = TransientResumeStrategy.createSimpleCache(); - - @Override - protected RouteBuilder createRouteBuilder() { - client = AWSSDKClientUtils.newKinesisClient(); - - context.getRegistry().bind("amazonKinesisClient", AWSSDKClientUtils.newKinesisClient()); - - return new RouteBuilder() { - @Override - public void configure() { - String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient"; - - fromF(kinesisEndpointUri, streamName) - // commenting out the strategy will cause the test to fail as event "First" will be consumed twice - .resumable().configuration(KinesisResumeStrategyConfiguration.builder().withResumeCache(resumeCache)) - .process(new KinesisConsumerOffsetProcessor()) - .process(exchange -> receivedMessages.add(exchange.getMessage().getBody(String.class))); - } - }; - } - - @BeforeEach - public void prepareEnvironment() { - createStream(client, streamName); - } - - private void sendEvent(String payload) { - client.putRecord(PutRecordRequest.builder().streamName(streamName).partitionKey("my-key") - .data(SdkBytes.fromUtf8String(payload)).build()); - } - - @Test - void shouldResumeConsumptionAfterRestart() { - - sendEvent("First"); - Awaitility.await().until(() -> receivedMessages.contains("First")); - - restartContext(); - - sendEvent("Second"); - Awaitility.await().until(() -> receivedMessages.contains("Second")); - - assertEquals(2, receivedMessages.size()); - } - - private void restartContext() { - context.stop(); - - // stop also seems to close the kinesis client, therefor we need to provide a new one - context.getRegistry().bind("amazonKinesisClient", AWSSDKClientUtils.newKinesisClient()); - - context.start(); - } -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java index 1868e7ad251..ce34c29bee4 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java @@ -25,9 +25,11 @@ import org.apache.camel.EndpointInject; import org.apache.camel.Message; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.aws2.kinesis.Kinesis2Constants; -import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAction; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategyConfiguration; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.resume.TransientResumeStrategy; +import org.apache.camel.resume.cache.ResumeCache; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; @@ -76,29 +78,26 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { } } - private static final class TestKinesisResumeAdapter implements KinesisResumeAdapter { + private static final class TestResumeAction extends KinesisResumeAction { private List<PutRecordsResponse> previousRecords; private final int expectedCount; - private GetShardIteratorRequest.Builder builder; - private TestKinesisResumeAdapter(int expectedCount) { + private TestResumeAction(int expectedCount) { this.expectedCount = expectedCount; } - @Override - public void resume() { - } - public void setPreviousRecords(List<PutRecordsResponse> previousRecords) { this.previousRecords = previousRecords; } + public int getExpectedCount() { + return expectedCount; + } + @Override - public void configureGetShardIteratorRequest( - GetShardIteratorRequest.Builder builder, String streamName, String shardId) { + public boolean evalEntry(Object shardId, Object sequenceNumber) { + final GetShardIteratorRequest.Builder builder = super.getBuilder(); ObjectHelper.notNull(builder, "builder"); - ObjectHelper.notNull(streamName, "streamName"); - ObjectHelper.notNull(shardId, "shardId"); LOG.debug("Waiting for data"); Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !previousRecords.isEmpty()); @@ -109,6 +108,7 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { builder.startingSequenceNumber(putRecordsResultEntry.sequenceNumber()); builder.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); + return false; } } @@ -126,7 +126,7 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { private final int expectedCount = messageCount / 2; private List<KinesisData> receivedMessages = new CopyOnWriteArrayList<>(); private List<PutRecordsResponse> previousRecords; - private TestKinesisResumeAdapter adapter = new TestKinesisResumeAdapter(expectedCount); + private TestResumeAction action = new TestResumeAction(expectedCount); @Override protected RouteBuilder createRouteBuilder() { @@ -137,11 +137,17 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() { - bindToRegistry("testResumeStrategy", new TransientResumeStrategy(adapter)); + final ResumeCache<Object> simpleCache = TransientResumeStrategy.createSimpleCache(); + final KinesisResumeStrategyConfiguration.KinesisResumeStrategyConfigurationBuilder resumeConfigurationBuilder + = KinesisResumeStrategyConfiguration.builder() + .withResumeCache(simpleCache); + + bindToRegistry(Kinesis2Constants.RESUME_ACTION, action); String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient"; fromF(kinesisEndpointUri, streamName) + .resumable().configuration(resumeConfigurationBuilder) .process(exchange -> { KinesisData data = new KinesisData(); final Message message = exchange.getMessage(); @@ -153,7 +159,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { receivedMessages.add(data); }) - .resumable("testResumeStrategy") .to("mock:result"); } }; @@ -172,7 +177,7 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { } } - adapter.setPreviousRecords(previousRecords); + action.setPreviousRecords(previousRecords); } @AfterEach diff --git a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java index 563f64c8889..a273c30ee1d 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java @@ -78,4 +78,8 @@ public class ResumeActionAwareAdapter implements ResumeActionAware, Cacheable, D return add(key, value); } + + protected ResumeAction getResumeAction() { + return resumeAction; + } } diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java index 5188a698bd9..f11004307de 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java @@ -2332,6 +2332,18 @@ public interface Kinesis2EndpointBuilderFactory { public String messageTimestamp() { return "CamelMessageTimestamp"; } + /** + * The resume action to execute when resuming. + * + * The option is a: {@code String} type. + * + * Group: consumer + * + * @return the name of the header {@code KinesisDbResumeAction}. + */ + public String kinesisDbResumeAction() { + return "CamelKinesisDbResumeAction"; + } /** * The shard ID of the shard where the data record was placed. *