This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new c08104ee967 kinesis connection retry mechanism added into the current consumer (#10870) c08104ee967 is described below commit c08104ee967dbe112af83b7813084a61d0bbd89f Author: Hamed Hatami <javaee.hat...@gmail.com> AuthorDate: Fri Jul 28 15:55:00 2023 +0200 kinesis connection retry mechanism added into the current consumer (#10870) * kinesis connection retry mechanism added into the current consumer * kinesis connection retry mechanism added into the current consumer * kinesis connection retry mechanism added into the current consumer * kinesis connection retry mechanism added into the current consumer * kinesis connection retry mechanism added into the current consumer * kinesis connection retry mechanism added into the current consumer * kinesis connection retry mechanism added into the current consumer --------- Co-authored-by: Hamed Hatami <hamed.hat...@postnord.com> --- .../component/aws2/kinesis/Kinesis2Consumer.java | 60 +++++++++++-------- .../component/aws2/kinesis/Kinesis2Endpoint.java | 25 +++++--- .../aws2/kinesis/consumer/KinesisConnection.java | 70 ++++++++++++++++++++++ .../aws2/kinesis/consumer/KinesisHealthCheck.java | 63 +++++++++++++++++++ .../KinesisConsumerClosedShardWithFailTest.java | 29 ++++++++- .../KinesisConsumerClosedShardWithSilentTest.java | 48 +++++++++++---- .../kinesis/integration/KinesisConsumerIT.java | 23 ++++--- .../integration/KinesisConsumerResumeIT.java | 7 ++- .../kinesis/integration/KinesisProducerIT.java | 3 + 9 files changed, 266 insertions(+), 62 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 6aef226e5ed..70481d598dc 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; @@ -33,8 +34,6 @@ import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; @@ -51,7 +50,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private boolean isShardClosed; private ResumeStrategy resumeStrategy; - public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) { + public Kinesis2Consumer(Kinesis2Endpoint endpoint, + Processor processor) { super(endpoint, processor); } @@ -59,6 +59,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R protected int poll() throws Exception { var processedExchangeCount = new AtomicInteger(0); + var kinesisConnection = KinesisConnection.getInstance(); if (!getEndpoint().getConfiguration().getShardId().isEmpty()) { var request = DescribeStreamRequest @@ -68,14 +69,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R DescribeStreamResponse response = null; if (getEndpoint().getConfiguration().isAsyncClient()) { try { - response = getAsyncClient() + response = kinesisConnection + .getAsyncClient(getEndpoint()) .describeStream(request) .get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } else { - response = getClient().describeStream(request); + response = kinesisConnection + .getClient(getEndpoint()) + .describeStream(request); } var shard = response @@ -90,13 +94,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .findFirst() .orElseThrow(() -> new IllegalStateException("The shard can't be found")); - fetchAndPrepareRecordsForCamel(shard, processedExchangeCount); + fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount); } else { - getShardList() + getShardList(kinesisConnection) .parallelStream() .forEach(shard -> { - fetchAndPrepareRecordsForCamel(shard, processedExchangeCount); + fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount); }); } @@ -105,10 +109,11 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private void fetchAndPrepareRecordsForCamel( final Shard shard, + final KinesisConnection kinesisConnection, AtomicInteger processedExchangeCount) { String shardIterator = null; try { - shardIterator = getShardIterator(shard); + shardIterator = getShardIterator(shard, kinesisConnection); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } @@ -129,14 +134,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R GetRecordsResponse result = null; if (getEndpoint().getConfiguration().isAsyncClient()) { try { - result = getAsyncClient() + result = kinesisConnection + .getAsyncClient(getEndpoint()) .getRecords(req) .get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } else { - result = getClient().getRecords(req); + result = kinesisConnection + .getClient(getEndpoint()) + .getRecords(req); } try { @@ -186,20 +194,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R return processedExchanges; } - private KinesisClient getClient() { - return getEndpoint().getClient(); - } - - private KinesisAsyncClient getAsyncClient() { - return getEndpoint().getAsyncClient(); - } - @Override public Kinesis2Endpoint getEndpoint() { return (Kinesis2Endpoint) super.getEndpoint(); } - private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException { + private String getShardIterator( + final Shard shard, + final KinesisConnection kinesisConnection) + throws ExecutionException, InterruptedException { var shardId = shard.shardId(); isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null; @@ -218,14 +221,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R GetShardIteratorResponse result = null; if (getEndpoint().getConfiguration().isAsyncClient()) { try { - result = getAsyncClient() + result = kinesisConnection + .getAsyncClient(getEndpoint()) .getShardIterator(request.build()) .get(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } else { - result = getClient().getShardIterator(request.build()); + result = kinesisConnection + .getClient(getEndpoint()) + .getShardIterator(request.build()); } return result.shardIterator(); @@ -298,7 +304,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R return getEndpoint().getConfiguration(); } - private List<Shard> getShardList() { + private List<Shard> getShardList(final KinesisConnection kinesisConnection) { var request = ListShardsRequest .builder() @@ -308,7 +314,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R List<Shard> shardList = null; if (getEndpoint().getConfiguration().isAsyncClient()) { try { - shardList = getAsyncClient() + shardList = kinesisConnection + .getAsyncClient(getEndpoint()) .listShards(request) .get() .shards(); @@ -316,7 +323,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R throw new RuntimeException(e); } } else { - shardList = getClient().listShards(request).shards(); + shardList = kinesisConnection + .getClient(getEndpoint()) + .listShards(request) + .shards(); } return shardList; diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java index e4d4d122c96..b97222fadd7 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java @@ -17,12 +17,14 @@ package org.apache.camel.component.aws2.kinesis; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisHealthCheck; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.support.ScheduledPollEndpoint; @@ -45,6 +47,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { private KinesisClient kinesisClient; private KinesisAsyncClient kinesisAsyncClient; + private static final String CONNECTION_CHECKER_EXECUTOR_NAME = "Kinesis_Streaming_Connection_Checker"; public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) { super(uri, component); @@ -54,19 +57,18 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { @Override protected void doStart() throws Exception { super.doStart(); + + var kinesisConnection = KinesisConnection.getInstance(); + if (!configuration.isCborEnabled()) { System.setProperty(CBOR_ENABLED.property(), "false"); } if (configuration.isAsyncClient() && Objects.isNull(configuration.getAmazonKinesisClient())) { - kinesisAsyncClient = KinesisClientFactory - .getKinesisAsyncClient(configuration) - .getKinesisAsyncClient(); + kinesisAsyncClient = kinesisConnection.getAsyncClient(this); } else { - kinesisClient = configuration.getAmazonKinesisClient() != null - ? configuration.getAmazonKinesisClient() - : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient(); + kinesisClient = kinesisConnection.getClient(this); } if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) @@ -101,10 +103,19 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { public Consumer createConsumer(Processor processor) throws Exception { final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor); consumer.setSchedulerProperties(getSchedulerProperties()); + startHealthChecks(); configureConsumer(consumer); return consumer; } + private void startHealthChecks() { + var timeoutCheckerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, + CONNECTION_CHECKER_EXECUTOR_NAME); + timeoutCheckerExecutorService.scheduleAtFixedRate(new KinesisHealthCheck(this), + 0, 5 * 1000, + TimeUnit.MILLISECONDS); + } + @Override public Kinesis2Component getComponent() { return (Kinesis2Component) super.getComponent(); diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java new file mode 100644 index 00000000000..4ac3c3046b5 --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.kinesis.consumer; + +import java.util.Objects; + +import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint; +import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; + +public class KinesisConnection { + private static volatile KinesisConnection instance; + private KinesisClient kinesisClient = null; + private KinesisAsyncClient kinesisAsyncClient = null; + + private KinesisConnection() { + } + + public static synchronized KinesisConnection getInstance() { + if (instance == null) { + synchronized (KinesisConnection.class) { + if (instance == null) { + instance = new KinesisConnection(); + } + } + } + return instance; + } + + public KinesisClient getClient(final Kinesis2Endpoint endpoint) { + if (Objects.isNull(kinesisClient)) { + kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null + ? endpoint.getConfiguration().getAmazonKinesisClient() + : KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()).getKinesisClient(); + } + return kinesisClient; + } + + public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) { + if (Objects.isNull(kinesisAsyncClient)) { + kinesisAsyncClient = KinesisClientFactory + .getKinesisAsyncClient(endpoint.getConfiguration()) + .getKinesisAsyncClient(); + } + return kinesisAsyncClient; + } + + public void setKinesisClient(final KinesisClient kinesisClient) { + this.kinesisClient = kinesisClient; + } + + public void setKinesisAsyncClient(final KinesisAsyncClient kinesisAsyncClient) { + this.kinesisAsyncClient = kinesisAsyncClient; + } +} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java new file mode 100644 index 00000000000..5bf59880dc2 --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.kinesis.consumer; + +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint; +import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; + +public class KinesisHealthCheck implements Runnable { + private Kinesis2Endpoint endpoint; + + public KinesisHealthCheck(Kinesis2Endpoint endpoint) { + this.endpoint = endpoint; + } + + @Override + public void run() { + var kinesisConnection = KinesisConnection.getInstance(); + if (this.endpoint.getConfiguration().isAsyncClient()) { + try { + if (Objects.isNull(kinesisConnection.getAsyncClient(this.endpoint)) || + kinesisConnection.getAsyncClient(this.endpoint) + .listStreams(ListStreamsRequest + .builder() + .build()) + .get() + .streamNames() + .isEmpty()) { + kinesisConnection.setKinesisAsyncClient(endpoint.getAsyncClient()); + } + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + if (Objects.isNull(kinesisConnection.getClient(this.endpoint)) || + kinesisConnection.getClient(this.endpoint) + .listStreams(ListStreamsRequest + .builder() + .build()) + .streamNames() + .isEmpty()) { + kinesisConnection.setKinesisClient(endpoint.getClient()); + } + } + } + +} diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java index 48e739e423f..2d7690a78fd 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java @@ -16,10 +16,12 @@ */ package org.apache.camel.component.aws2.kinesis; +import java.lang.reflect.Field; import java.util.ArrayList; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; import org.apache.camel.impl.DefaultCamelContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -52,6 +54,8 @@ public class KinesisConsumerClosedShardWithFailTest { private KinesisClient kinesisClient; @Mock private AsyncProcessor processor; + @Mock + private KinesisConnection kinesisConnection; private final CamelContext context = new DefaultCamelContext(); private final Kinesis2Component component = new Kinesis2Component(context); @@ -65,6 +69,9 @@ public class KinesisConsumerClosedShardWithFailTest { configuration.setIteratorType(ShardIteratorType.LATEST); configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.fail); configuration.setStreamName("streamName"); + + setMock(kinesisConnection); + Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component); endpoint.start(); underTest = new Kinesis2Consumer(endpoint, processor); @@ -74,14 +81,30 @@ public class KinesisConsumerClosedShardWithFailTest { ArrayList<Shard> shardList = new ArrayList<>(); shardList.add(shard); - when(kinesisClient.getRecords(any(GetRecordsRequest.class))) + when(kinesisConnection + .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient); + + when(kinesisClient + .getRecords(any(GetRecordsRequest.class))) .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build()); - when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) + when(kinesisClient + .getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build()); - when(kinesisClient.listShards(any(ListShardsRequest.class))) + when(kinesisClient + .listShards(any(ListShardsRequest.class))) .thenReturn(ListShardsResponse.builder().shards(shardList).build()); } + private void setMock(KinesisConnection mock) { + try { + Field instance = KinesisConnection.class.getDeclaredField("instance"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Test public void itObtainsAShardIteratorOnFirstPoll() { assertThrows(IllegalStateException.class, () -> { diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java index 2b34ca315ed..4605cd5b60f 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -16,11 +16,13 @@ */ package org.apache.camel.component.aws2.kinesis; +import java.lang.reflect.Field; import java.nio.charset.Charset; import java.util.ArrayList; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; import org.apache.camel.impl.DefaultCamelContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -60,7 +62,8 @@ public class KinesisConsumerClosedShardWithSilentTest { private KinesisClient kinesisClient; @Mock private AsyncProcessor processor; - + @Mock + private KinesisConnection kinesisConnection; private final CamelContext context = new DefaultCamelContext(); private final Kinesis2Component component = new Kinesis2Component(context); @@ -73,6 +76,9 @@ public class KinesisConsumerClosedShardWithSilentTest { configuration.setIteratorType(ShardIteratorType.LATEST); configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent); configuration.setStreamName("streamName"); + + setMock(kinesisConnection); + Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component); endpoint.start(); underTest = new Kinesis2Consumer(endpoint, processor); @@ -82,21 +88,41 @@ public class KinesisConsumerClosedShardWithSilentTest { ArrayList<Shard> shardList = new ArrayList<>(); shardList.add(shard); - when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder() - .nextShardIterator("shardIterator") - .records( - Record.builder().sequenceNumber("1").data(SdkBytes.fromString("Hello", Charset.defaultCharset())) - .build(), - Record.builder().sequenceNumber("2").data(SdkBytes.fromString("Hello", Charset.defaultCharset())) - .build()) - .build()); - when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) + when(kinesisConnection + .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient); + + when(kinesisClient + .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder() + .nextShardIterator("shardIterator") + .records( + Record.builder().sequenceNumber("1") + .data(SdkBytes.fromString("Hello", Charset.defaultCharset())) + .build(), + Record.builder().sequenceNumber("2") + .data(SdkBytes.fromString("Hello", Charset.defaultCharset())) + .build()) + .build()); + + when(kinesisClient + .getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build()); - when(kinesisClient.listShards(any(ListShardsRequest.class))) + when(kinesisClient + .listShards(any(ListShardsRequest.class))) .thenReturn(ListShardsResponse.builder().shards(shardList).build()); context.start(); underTest.start(); + + } + + private void setMock(KinesisConnection mock) { + try { + Field instance = KinesisConnection.class.getDeclaredField("instance"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Test diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java index beebc63905e..7cc59431eb8 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java @@ -22,11 +22,10 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.camel.EndpointInject; -import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.aws2.kinesis.Kinesis2Constants; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -81,6 +80,7 @@ public class KinesisConsumerIT extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() { client = AWSSDKClientUtils.newKinesisClient(); + KinesisConnection.getInstance().setKinesisClient(client); context.getRegistry().bind("amazonKinesisClient", client); @@ -90,20 +90,17 @@ public class KinesisConsumerIT extends CamelTestSupport { String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient"; fromF(kinesisEndpointUri, streamName) - .process(new Processor() { - @Override - public void process(Exchange exchange) { - KinesisData data = new KinesisData(); + .process(exchange -> { + KinesisData data = new KinesisData(); - final Message message = exchange.getMessage(); + final Message message = exchange.getMessage(); - if (message != null) { - data.body = message.getBody(String.class); - data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class); - } - - receivedMessages.add(data); + if (message != null) { + data.body = message.getBody(String.class); + data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class); } + + receivedMessages.add(data); }) .to("mock:result"); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java index 2e6a9a8fe4a..20f508e13bd 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java @@ -25,6 +25,7 @@ import org.apache.camel.EndpointInject; import org.apache.camel.Message; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.aws2.kinesis.Kinesis2Constants; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.resume.TransientResumeStrategy; @@ -132,6 +133,7 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() { client = AWSSDKClientUtils.newKinesisClient(); + KinesisConnection.getInstance().setKinesisClient(client); context.getRegistry().bind("amazonKinesisClient", client); @@ -145,7 +147,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { fromF(kinesisEndpointUri, streamName) .process(exchange -> { KinesisData data = new KinesisData(); - final Message message = exchange.getMessage(); if (message != null) { @@ -183,12 +184,12 @@ public class KinesisConsumerResumeIT extends CamelTestSupport { } @DisplayName("Tests that the component can resume messages from AWS Kinesis") - @Timeout(value = 2, unit = TimeUnit.MINUTES) + @Timeout(value = 3, unit = TimeUnit.MINUTES) @Test void testProduceMessages() { result.expectedMessageCount(expectedCount); - await().atMost(1, TimeUnit.MINUTES) + await().atMost(2, TimeUnit.MINUTES) .untilAsserted(() -> result.assertIsSatisfied()); assertEquals(expectedCount, receivedMessages.size()); diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java index 036f54f90e9..7c5207ffd70 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java @@ -25,6 +25,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.aws2.kinesis.Kinesis2Constants; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.services.AWSService; @@ -119,6 +120,8 @@ public class KinesisProducerIT extends CamelTestSupport { protected RouteBuilder createRouteBuilder() { client = AWSSDKClientUtils.newKinesisClient(); + KinesisConnection.getInstance().setKinesisClient(client); + context.getRegistry().bind("amazonKinesisClient", client); return new RouteBuilder() {