This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push: new 46ec41887d6 CAMEL-19491: aws2-sqs - use `ScheduledPollConsumerHealthCheck` (#10847) (#10860) 46ec41887d6 is described below commit 46ec41887d6381ddbebab0c7cf699f7cc1d2f0ed Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jul 27 18:05:45 2023 +0200 CAMEL-19491: aws2-sqs - use `ScheduledPollConsumerHealthCheck` (#10847) (#10860) * CAMEL-19491: aws2-sqs - use `ScheduledPollConsumerHealthCheck` The SQS consumer is a polling consumer, and thus we can use `ScheduledPollConsumerHealthCheck` where we observe the last poll rather than making a new operation against SQS. Previously `listQueues` was called, which is a different API than what the consumer actually uses (`receiveMessage`). The `Sqs2ConsumerHealthCheck` used to check if configured region existed in the metadata of the client. That logic was copied over from the old `Sqs2ComponentVerifierExtension`. It is removed from this implementation as the call to `poll` will yield in the same outcome in case an incorrect Region is defined in the configuration. In this case, a failed poll, causing the health check to be DOWN. * CAMEL-19491: aws2-ddb - use ScheduledPollConsumerHealthCheck * CAMEL-19491: aws2-cloudtrail - use ScheduledPollConsumerHealthCheck * CAMEL-19491: aws2-s3 - use ScheduledPollConsumerHealthCheck * CAMEL-19491: aws2-kinesis - use `ScheduledPollConsumerHealthCheck` Co-authored-by: Simon Oxenvad Rasmussen <s...@viabill.com> --- .../aws/cloudtrail/CloudtrailConsumer.java | 19 ------ .../cloudtrail/CloudtrailConsumerHealthCheck.java | 74 --------------------- .../component/aws2/ddb/Ddb2ClientHealthCheck.java | 70 -------------------- .../camel/component/aws2/ddb/Ddb2Endpoint.java | 14 ---- .../ddb/Ddb2ClientHealthCheckProfileCredsTest.java | 8 +-- .../ddb/Ddb2ClientHealthCheckStaticCredsTest.java | 8 +-- .../Ddb2StreamConsumerHealthCustomClientTest.java | 10 ++- .../component/aws2/kinesis/Kinesis2Consumer.java | 15 ----- .../aws2/kinesis/Kinesis2ConsumerHealthCheck.java | 73 --------------------- ...inesis2ConsumerHealthCheckProfileCredsTest.java | 5 +- ...Kinesis2ConsumerHealthCheckStaticCredsTest.java | 9 +-- .../Kinesis2ConsumerHealthCustomClientTest.java | 3 +- .../camel/component/aws2/s3/AWS2S3Consumer.java | 18 ------ .../aws2/s3/AWS2S3ConsumerHealthCheck.java | 75 ---------------------- .../AWS2S3ConsumerHealthCheckProfileCredsTest.java | 6 +- .../AWS2S3ConsumerHealthCheckStaticCredsTest.java | 6 +- .../camel/component/aws2/sqs/Sqs2Consumer.java | 14 ---- .../aws2/sqs/Sqs2ConsumerHealthCheck.java | 74 --------------------- .../Sqs2ConsumerHealthCheckProfileCredsTest.java | 6 +- .../Sqs2ConsumerHealthCheckStaticCredsTest.java | 6 +- 20 files changed, 23 insertions(+), 490 deletions(-) diff --git a/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java b/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java index cdb922a3137..6426d92e7f3 100644 --- a/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java +++ b/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java @@ -26,8 +26,6 @@ import java.util.Queue; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.health.HealthCheckHelper; -import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -41,9 +39,6 @@ import software.amazon.awssdk.services.cloudtrail.model.LookupEventsResponse; public class CloudtrailConsumer extends ScheduledBatchPollingConsumer { private static Instant lastTime; - private WritableHealthCheckRepository healthCheckRepository; - private CloudtrailConsumerHealthCheck consumerHealthCheck; - public CloudtrailConsumer(CloudtrailEndpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -51,16 +46,6 @@ public class CloudtrailConsumer extends ScheduledBatchPollingConsumer { @Override protected void doStart() throws Exception { super.doStart(); - - healthCheckRepository = HealthCheckHelper.getHealthCheckRepository( - getEndpoint().getCamelContext(), - "components", - WritableHealthCheckRepository.class); - - if (healthCheckRepository != null) { - consumerHealthCheck = new CloudtrailConsumerHealthCheck(this, getRouteId()); - healthCheckRepository.addHealthCheck(consumerHealthCheck); - } } @Override @@ -107,10 +92,6 @@ public class CloudtrailConsumer extends ScheduledBatchPollingConsumer { @Override protected void doStop() throws Exception { - if (healthCheckRepository != null && consumerHealthCheck != null) { - healthCheckRepository.removeHealthCheck(consumerHealthCheck); - consumerHealthCheck = null; - } super.doStop(); } diff --git a/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumerHealthCheck.java b/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumerHealthCheck.java deleted file mode 100644 index f9983cb7966..00000000000 --- a/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumerHealthCheck.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws.cloudtrail; - -import java.util.Map; - -import org.apache.camel.health.HealthCheckResultBuilder; -import org.apache.camel.impl.health.AbstractHealthCheck; -import org.apache.camel.util.ObjectHelper; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.cloudtrail.CloudTrailClient; -import software.amazon.awssdk.services.cloudtrail.model.ListChannelsRequest; - -public class CloudtrailConsumerHealthCheck extends AbstractHealthCheck { - - private final CloudtrailConsumer cloudtrailConsumer; - - public CloudtrailConsumerHealthCheck(CloudtrailConsumer cloudtrailConsumer, String routeId) { - super("camel", "aws-cloudtrail-consumer-" + routeId); - this.cloudtrailConsumer = cloudtrailConsumer; - } - - @Override - protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) { - - CloudtrailConfiguration configuration = cloudtrailConsumer.getEndpoint().getConfiguration(); - try { - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - if (!CloudTrailClient.serviceMetadata().regions().contains(Region.of(configuration.getRegion()))) { - builder.message("The service is not supported in this region"); - builder.down(); - return; - } - } - CloudTrailClient client = cloudtrailConsumer.getEndpoint().getClient(); - client.listChannels(ListChannelsRequest.builder().maxResults(1).build()); - } catch (AwsServiceException e) { - builder.message(e.getMessage()); - builder.error(e); - if (ObjectHelper.isNotEmpty(e.statusCode())) { - builder.detail(SERVICE_STATUS_CODE, e.statusCode()); - } - if (ObjectHelper.isNotEmpty(e.awsErrorDetails().errorCode())) { - builder.detail(SERVICE_ERROR_CODE, e.awsErrorDetails().errorCode()); - } - builder.down(); - return; - - } catch (Exception e) { - builder.error(e); - builder.down(); - return; - } - - builder.up(); - - } - -} diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheck.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheck.java deleted file mode 100644 index c66530c1dfd..00000000000 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheck.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.aws2.ddb; - -import java.util.Map; - -import org.apache.camel.health.HealthCheckResultBuilder; -import org.apache.camel.impl.health.AbstractHealthCheck; -import org.apache.camel.util.ObjectHelper; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; -import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; - -public class Ddb2ClientHealthCheck extends AbstractHealthCheck { - - private final Ddb2Endpoint ddb2Endpoint; - - public Ddb2ClientHealthCheck(Ddb2Endpoint ddb2Endpoint, String clientId) { - super("camel", "aws2-ddb-client-" + clientId); - this.ddb2Endpoint = ddb2Endpoint; - } - - @Override - protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) { - Ddb2Configuration configuration = ddb2Endpoint.getConfiguration(); - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - if (!DynamoDbClient.serviceMetadata().regions().contains(Region.of(configuration.getRegion()))) { - builder.message("The service is not supported in this region"); - builder.down(); - return; - } - } - try { - DynamoDbClient ddbClient = ddb2Endpoint.getDdbClient(); - ddbClient.listTables(ListTablesRequest.builder().limit(1).build()); - } catch (AwsServiceException e) { - builder.message(e.getMessage()); - builder.error(e); - if (ObjectHelper.isNotEmpty(e.statusCode())) { - builder.detail(SERVICE_STATUS_CODE, e.statusCode()); - } - if (ObjectHelper.isNotEmpty(e.awsErrorDetails().errorCode())) { - builder.detail(SERVICE_ERROR_CODE, e.awsErrorDetails().errorCode()); - } - builder.down(); - return; - } catch (Exception e) { - builder.error(e); - builder.down(); - return; - } - builder.up(); - } -} diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java index 996740c60cd..cace6fcfb72 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java @@ -25,8 +25,6 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.aws2.ddb.client.Ddb2ClientFactory; -import org.apache.camel.health.HealthCheckHelper; -import org.apache.camel.impl.health.ComponentsHealthCheckRepository; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.support.ScheduledPollEndpoint; @@ -58,9 +56,6 @@ public class Ddb2Endpoint extends ScheduledPollEndpoint { private static final Logger LOG = LoggerFactory.getLogger(Ddb2Endpoint.class); - private ComponentsHealthCheckRepository healthCheckRepository; - private Ddb2ClientHealthCheck clientHealthCheck; - @UriParam private Ddb2Configuration configuration; @@ -85,15 +80,6 @@ public class Ddb2Endpoint extends ScheduledPollEndpoint { public void doStart() throws Exception { super.doStart(); - healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(getCamelContext(), - ComponentsHealthCheckRepository.REPOSITORY_ID, ComponentsHealthCheckRepository.class); - - if (healthCheckRepository != null) { - // Do not register the health check until we resolve CAMEL-18992 - //clientHealthCheck = new Ddb2ClientHealthCheck(this, getId()); - //healthCheckRepository.addHealthCheck(clientHealthCheck); - } - ddbClient = configuration.getAmazonDDBClient() != null ? configuration.getAmazonDDBClient() : Ddb2ClientFactory.getDynamoDBClient(configuration).getDynamoDBClient(); diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckProfileCredsTest.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckProfileCredsTest.java index 285f3941139..8fc9bcc08d0 100644 --- a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckProfileCredsTest.java +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckProfileCredsTest.java @@ -66,7 +66,7 @@ public class Ddb2ClientHealthCheckProfileCredsTest extends CamelTestSupport { @Override public void configure() { - from("direct:listTables") + from("direct:listTables").routeId("ddb-route") .to("aws2-ddb://test?region=l&useDefaultCredentialsProvider=true&enabledInitialDescribeTable=false"); } }; @@ -85,14 +85,12 @@ public class Ddb2ClientHealthCheckProfileCredsTest extends CamelTestSupport { Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsAws2DdbHealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-ddb-client")) + .filter(result -> result.getCheck().getId().startsWith("consumer:ddb-route")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsAws2DdbHealthCheck, "aws2-ddb check"); - Assertions.assertTrue(hasRegionMessage, "aws2-ddb check error message"); }); } diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckStaticCredsTest.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckStaticCredsTest.java index 677501980ff..592affc075a 100644 --- a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckStaticCredsTest.java +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddb/Ddb2ClientHealthCheckStaticCredsTest.java @@ -66,7 +66,7 @@ public class Ddb2ClientHealthCheckStaticCredsTest extends CamelTestSupport { @Override public void configure() { - from("direct:listClusters") + from("direct:listClusters").routeId("ddb-route") .to("aws2-ddb://test?region=l&secretKey=l&accessKey=k&enabledInitialDescribeTable=false"); } }; @@ -85,14 +85,12 @@ public class Ddb2ClientHealthCheckStaticCredsTest extends CamelTestSupport { Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsAws2DdbHealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-ddb-client")) + .filter(result -> result.getCheck().getId().startsWith("consumer:ddb-route")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsAws2DdbHealthCheck, "aws2-ddb check"); - Assertions.assertTrue(hasRegionMessage, "aws2-ddb check error message"); }); } diff --git a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumerHealthCustomClientTest.java b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumerHealthCustomClientTest.java index 7a4f9b7410e..078e0740313 100644 --- a/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumerHealthCustomClientTest.java +++ b/components/camel-aws/camel-aws2-ddb/src/test/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumerHealthCustomClientTest.java @@ -89,15 +89,13 @@ public class Ddb2StreamConsumerHealthCustomClientTest extends CamelTestSupport { await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); - boolean containsAws2DdbStreamHealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-ddbstream-consumer")) + boolean containsAws2SqsHealthCheck = res2.stream() + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); - Assertions.assertTrue(containsAws2DdbStreamHealthCheck, "aws2-ddbstream check"); - Assertions.assertFalse(hasRegionMessage, "aws2-ddbstream check error message"); + Assertions.assertTrue(containsAws2SqsHealthCheck, "aws2-sqs check"); }); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index e3eae1edd2d..d347ef3a46b 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -24,8 +24,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter; -import org.apache.camel.health.HealthCheckHelper; -import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.ScheduledBatchPollingConsumer; @@ -52,9 +50,6 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private boolean isShardClosed; private ResumeStrategy resumeStrategy; - private WritableHealthCheckRepository healthCheckRepository; - private Kinesis2ConsumerHealthCheck consumerHealthCheck; - public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -241,16 +236,6 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R protected void doStart() throws Exception { super.doStart(); - healthCheckRepository = HealthCheckHelper.getHealthCheckRepository( - getEndpoint().getCamelContext(), - "components", - WritableHealthCheckRepository.class); - - if (healthCheckRepository != null) { - consumerHealthCheck = new Kinesis2ConsumerHealthCheck(this, getRouteId()); - healthCheckRepository.addHealthCheck(consumerHealthCheck); - } - if (resumeStrategy != null) { resumeStrategy.loadCache(); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java deleted file mode 100644 index c2f53ebcfd4..00000000000 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheck.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.kinesis; - -import java.util.Map; - -import org.apache.camel.health.HealthCheckResultBuilder; -import org.apache.camel.impl.health.AbstractHealthCheck; -import org.apache.camel.util.ObjectHelper; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.kinesis.KinesisClient; - -public class Kinesis2ConsumerHealthCheck extends AbstractHealthCheck { - - private final Kinesis2Consumer kinesis2Consumer; - - public Kinesis2ConsumerHealthCheck(Kinesis2Consumer kinesis2Consumer, String routeId) { - super("camel", "aws2-kinesis-consumer-" + routeId); - this.kinesis2Consumer = kinesis2Consumer; - } - - @Override - protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) { - - try { - Kinesis2Configuration configuration = kinesis2Consumer.getConfiguration(); - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - if (!KinesisClient.serviceMetadata().regions().contains(Region.of(configuration.getRegion()))) { - builder.message("The service is not supported in this region"); - builder.down(); - return; - } - } - KinesisClient client = kinesis2Consumer.getEndpoint().getClient(); - client.listStreams(); - } catch (AwsServiceException e) { - builder.message(e.getMessage()); - builder.error(e); - if (ObjectHelper.isNotEmpty(e.statusCode())) { - builder.detail(SERVICE_STATUS_CODE, e.statusCode()); - } - if (ObjectHelper.isNotEmpty(e.awsErrorDetails().errorCode())) { - builder.detail(SERVICE_ERROR_CODE, e.awsErrorDetails().errorCode()); - } - builder.down(); - return; - - } catch (Exception e) { - builder.error(e); - builder.down(); - return; - } - - builder.up(); - - } - -} diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java index 30c8e6696a3..6a161b5cb7b 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsTest.java @@ -92,14 +92,11 @@ public class Kinesis2ConsumerHealthCheckProfileCredsTest extends CamelTestSuppor Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsKinesis2HealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-kinesis-consumer")) + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsKinesis2HealthCheck, "aws2-kinesis check"); - Assertions.assertTrue(hasRegionMessage, "aws2-kinesis check error message"); }); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java index 3e6e7e8e376..2aa847f378b 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckStaticCredsTest.java @@ -75,8 +75,8 @@ public class Kinesis2ConsumerHealthCheckStaticCredsTest extends CamelTestSupport @Override public void configure() { - from("aws2-kinesis://stream?region=l&secretKey=l&accessKey=k") - .startupOrder(2).log("${body}").routeId("test-health-it"); + from("aws2-kinesis://stream?region=l&secretKey=l&accessKey=k").startupOrder(2).log("${body}") + .routeId("test-health-it"); } }; } @@ -93,14 +93,11 @@ public class Kinesis2ConsumerHealthCheckStaticCredsTest extends CamelTestSupport Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsKinesis2HealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-kinesis-consumer")) + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsKinesis2HealthCheck, "aws2-kinesis check"); - Assertions.assertTrue(hasRegionMessage, "aws2-kinesis check error message"); }); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java index 6eb4c84bbd3..eabadefea22 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCustomClientTest.java @@ -76,8 +76,7 @@ public class Kinesis2ConsumerHealthCustomClientTest extends CamelTestSupport { @Override public void configure() { - from("aws2-kinesis://stream") - .startupOrder(2).log("${body}").routeId("test-health-it"); + from("aws2-kinesis://stream").startupOrder(2).log("${body}").routeId("test-health-it"); } }; } diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index b19c85b5abf..4bbd7ba8a0c 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -32,8 +32,6 @@ import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.health.HealthCheckHelper; -import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.support.SynchronizationAdapter; @@ -71,8 +69,6 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { private String marker; private transient String s3ConsumerToString; - private WritableHealthCheckRepository healthCheckRepository; - private AWS2S3ConsumerHealthCheck consumerHealthCheck; public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -82,16 +78,6 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { protected void doStart() throws Exception { super.doStart(); - healthCheckRepository = HealthCheckHelper.getHealthCheckRepository( - getEndpoint().getCamelContext(), - "components", - WritableHealthCheckRepository.class); - - if (healthCheckRepository != null) { - consumerHealthCheck = new AWS2S3ConsumerHealthCheck(this, getRouteId()); - healthCheckRepository.addHealthCheck(consumerHealthCheck); - } - if (getConfiguration().isMoveAfterRead()) { try { getAmazonS3Client() @@ -459,10 +445,6 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { @Override protected void doStop() throws Exception { - if (healthCheckRepository != null && consumerHealthCheck != null) { - healthCheckRepository.removeHealthCheck(consumerHealthCheck); - consumerHealthCheck = null; - } super.doStop(); } } diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheck.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheck.java deleted file mode 100644 index 2cb8b875b68..00000000000 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheck.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.s3; - -import java.util.Map; - -import org.apache.camel.health.HealthCheckResultBuilder; -import org.apache.camel.impl.health.AbstractHealthCheck; -import org.apache.camel.util.ObjectHelper; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.HeadBucketRequest; - -public class AWS2S3ConsumerHealthCheck extends AbstractHealthCheck { - - private final AWS2S3Consumer aws2S3Consumer; - - public AWS2S3ConsumerHealthCheck(AWS2S3Consumer aws2S3Consumer, String routeId) { - super("camel", "aws2-s3-consumer-" + routeId); - this.aws2S3Consumer = aws2S3Consumer; - } - - @Override - protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) { - - AWS2S3Configuration configuration = aws2S3Consumer.getConfiguration(); - try { - - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - if (!S3Client.serviceMetadata().regions().contains(Region.of(configuration.getRegion()))) { - builder.message("The service is not supported in this region"); - builder.down(); - return; - } - } - S3Client client = aws2S3Consumer.getAmazonS3Client(); - client.headBucket(HeadBucketRequest.builder().bucket(configuration.getBucketName()).build()); - } catch (AwsServiceException e) { - builder.message(e.getMessage()); - builder.error(e); - if (ObjectHelper.isNotEmpty(e.statusCode())) { - builder.detail(SERVICE_STATUS_CODE, e.statusCode()); - } - if (ObjectHelper.isNotEmpty(e.awsErrorDetails().errorCode())) { - builder.detail(SERVICE_ERROR_CODE, e.awsErrorDetails().errorCode()); - } - builder.down(); - return; - - } catch (Exception e) { - builder.error(e); - builder.down(); - return; - } - - builder.up(); - - } - -} diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckProfileCredsTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckProfileCredsTest.java index f242e26cceb..db56bab3162 100644 --- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckProfileCredsTest.java +++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckProfileCredsTest.java @@ -92,14 +92,12 @@ public class AWS2S3ConsumerHealthCheckProfileCredsTest extends CamelTestSupport Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsAws2S3HealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-s3-consumer")) + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsAws2S3HealthCheck, "aws2-s3 check"); - Assertions.assertTrue(hasRegionMessage, "aws2-s3 check error message"); }); } diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckStaticCredsTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckStaticCredsTest.java index 275f323374c..8992ba05f65 100644 --- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckStaticCredsTest.java +++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/AWS2S3ConsumerHealthCheckStaticCredsTest.java @@ -93,14 +93,12 @@ public class AWS2S3ConsumerHealthCheckStaticCredsTest extends CamelTestSupport { Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsAws2S3HealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-s3-consumer")) + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsAws2S3HealthCheck, "aws2-s3 check"); - Assertions.assertTrue(hasRegionMessage, "aws2-s3 check error message"); }); } diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 3cc3cb1fc18..c9f27c93d1d 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -35,8 +35,6 @@ import org.apache.camel.ExchangePropertyKey; import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.health.HealthCheckHelper; -import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.ScheduledPollConsumerScheduler; import org.apache.camel.spi.Synchronization; @@ -73,8 +71,6 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { private transient String sqsConsumerToString; private Collection<String> attributeNames; private Collection<String> messageAttributeNames; - private WritableHealthCheckRepository healthCheckRepository; - private Sqs2ConsumerHealthCheck consumerHealthCheck; public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -377,16 +373,6 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { } super.doStart(); - - // health-check is optional so discover and resolve - healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(getEndpoint().getCamelContext(), "components", - WritableHealthCheckRepository.class); - - if (healthCheckRepository != null) { - consumerHealthCheck = new Sqs2ConsumerHealthCheck(this, getRouteId()); - healthCheckRepository.addHealthCheck(consumerHealthCheck); - } - } @Override diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheck.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheck.java deleted file mode 100644 index 24721beebb6..00000000000 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheck.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.aws2.sqs; - -import java.util.Map; - -import org.apache.camel.health.HealthCheckResultBuilder; -import org.apache.camel.impl.health.AbstractHealthCheck; -import org.apache.camel.util.ObjectHelper; -import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.ListQueuesRequest; - -public class Sqs2ConsumerHealthCheck extends AbstractHealthCheck { - - private final Sqs2Consumer sqs2Consumer; - - public Sqs2ConsumerHealthCheck(Sqs2Consumer sqs2Consumer, String routeId) { - super("camel", "aws2-sqs-consumer-" + routeId); - this.sqs2Consumer = sqs2Consumer; - } - - @Override - protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) { - - Sqs2Configuration configuration = sqs2Consumer.getConfiguration(); - try { - if (ObjectHelper.isNotEmpty(configuration.getRegion())) { - if (!SqsClient.serviceMetadata().regions().contains(Region.of(configuration.getRegion()))) { - builder.message("The service is not supported in this region"); - builder.down(); - return; - } - } - SqsClient client = sqs2Consumer.getClient(); - client.listQueues(ListQueuesRequest.builder().maxResults(1).build()); - } catch (AwsServiceException e) { - builder.message(e.getMessage()); - builder.error(e); - if (ObjectHelper.isNotEmpty(e.statusCode())) { - builder.detail(SERVICE_STATUS_CODE, e.statusCode()); - } - if (ObjectHelper.isNotEmpty(e.awsErrorDetails().errorCode())) { - builder.detail(SERVICE_ERROR_CODE, e.awsErrorDetails().errorCode()); - } - builder.down(); - return; - - } catch (Exception e) { - builder.error(e); - builder.down(); - return; - } - - builder.up(); - - } - -} diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckProfileCredsTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckProfileCredsTest.java index 509291f8bca..1ba70c0edfd 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckProfileCredsTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckProfileCredsTest.java @@ -93,14 +93,12 @@ public class Sqs2ConsumerHealthCheckProfileCredsTest extends CamelTestSupport { Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsAws2SqsHealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-sqs-consumer")) + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsAws2SqsHealthCheck, "aws2-sqs check"); - Assertions.assertTrue(hasRegionMessage, "aws2-sqs check error message"); }); } diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckStaticCredsTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckStaticCredsTest.java index 2ba1e4e435b..af5a0304e17 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckStaticCredsTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerHealthCheckStaticCredsTest.java @@ -93,14 +93,12 @@ public class Sqs2ConsumerHealthCheckStaticCredsTest extends CamelTestSupport { Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); boolean down = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); boolean containsAws2SqsHealthCheck = res2.stream() - .filter(result -> result.getCheck().getId().startsWith("aws2-sqs-consumer")) + .filter(result -> result.getCheck().getId().startsWith("consumer:test-health-it")) .findAny() .isPresent(); - boolean hasRegionMessage = res2.stream() - .anyMatch(r -> r.getMessage().stream().anyMatch(msg -> msg.contains("region"))); + Assertions.assertTrue(down, "liveness check"); Assertions.assertTrue(containsAws2SqsHealthCheck, "aws2-sqs check"); - Assertions.assertTrue(hasRegionMessage, "aws2-sqs check error message"); }); }