This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit cad8ccf506f75716a9159b2976cf02dd051080bb Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Apr 7 13:17:46 2021 +0200 CAMEL-16465 - Camel-AWS: Add useDefaultCredentialProvider option to all the components - Kinesis component --- .../aws2/kinesis/Kinesis2Configuration.java | 11 +++ .../component/aws2/kinesis/Kinesis2Endpoint.java | 5 +- .../aws2/kinesis/client/KinesisClientFactory.java | 41 ++++++++ .../aws2/kinesis/client/KinesisInternalClient.java | 16 ++++ .../impl/KinesisClientIAMOptimizedImpl.java} | 103 ++++++--------------- .../impl/KinesisClientStandardImpl.java} | 96 +++++-------------- 6 files changed, 119 insertions(+), 153 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java index 3f80cd4..ec89c1e 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java @@ -77,6 +77,9 @@ public class Kinesis2Configuration implements Cloneable { @UriParam(label = "common", description = "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option") private String uriEndpointOverride; + @UriParam(label = "common", defaultValue = "false", description = "Set whether the Kinesis client should expect to load credentials through a default credentials provider or to expect " + + "static credentials to be passed in.") + private boolean useDefaultCredentialsProvider; public KinesisClient getAmazonKinesisClient() { return amazonKinesisClient; @@ -214,6 +217,14 @@ public class Kinesis2Configuration implements Cloneable { this.uriEndpointOverride = uriEndpointOverride; } + public boolean isUseDefaultCredentialsProvider() { + return useDefaultCredentialsProvider; + } + + public void setUseDefaultCredentialsProvider(boolean useDefaultCredentialsProvider) { + this.useDefaultCredentialsProvider = useDefaultCredentialsProvider; + } + // ************************************************* // // ************************************************* diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java index ebe2e31..d0f819c 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java @@ -22,6 +22,7 @@ import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.support.ScheduledPollEndpoint; @@ -64,8 +65,8 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { System.setProperty(CBOR_ENABLED.property(), "false"); } kinesisClient = configuration.getAmazonKinesisClient() != null - ? configuration.getAmazonKinesisClient() : createKinesisClient(); - + ? configuration.getAmazonKinesisClient() : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient(); + if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && configuration.getSequenceNumber().isEmpty()) { diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java new file mode 100644 index 0000000..7c8f1ef --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.kinesis.client; + +import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; +import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMOptimizedImpl; +import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientStandardImpl; + +/** + * Factory class to return the correct type of AWS Kinesis client. + */ +public final class KinesisClientFactory { + + private KinesisClientFactory() { + } + + /** + * Return the correct aws Kinesis client (based on remote vs local). + * + * @param configuration configuration + * @return KinesisClient + */ + public static KinesisInternalClient getKinesisClient(Kinesis2Configuration configuration) { + return configuration.isUseDefaultCredentialsProvider() + ? new KinesisClientIAMOptimizedImpl(configuration) : new KinesisClientStandardImpl(configuration); + } +} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java new file mode 100644 index 0000000..849ef56 --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java @@ -0,0 +1,16 @@ +package org.apache.camel.component.aws2.kinesis.client; + +import software.amazon.awssdk.services.kinesis.KinesisClient; + +/** + * Manage the required actions of a Kinesis client for either local or remote. + */ +public interface KinesisInternalClient { + + /** + * Returns a Kinesis client after a factory method determines which one to return. + * + * @return KinesisClient client + */ + KinesisClient getKinesisClient(); +} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java similarity index 53% copy from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java copy to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java index ebe2e31..1ebde1f 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java @@ -14,19 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.aws2.kinesis; +package org.apache.camel.component.aws2.kinesis.client.impl; -import java.net.URI; - -import org.apache.camel.Category; -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.support.ScheduledPollEndpoint; +import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; +import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.SdkHttpConfigurationOption; @@ -35,80 +31,33 @@ import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.utils.AttributeMap; -import static software.amazon.awssdk.core.SdkSystemSetting.CBOR_ENABLED; +import java.net.URI; /** - * Consume and produce records from and to AWS Kinesis Streams using AWS SDK version 2.x. + * Manage an AWS Kinesis client for all users to use (enabling temporary creds). This implementation is for remote instances + * to manage the credentials on their own (eliminating credential rotations) */ -@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis", title = "AWS 2 Kinesis", syntax = "aws2-kinesis:streamName", - category = { Category.CLOUD, Category.MESSAGING }) -public class Kinesis2Endpoint extends ScheduledPollEndpoint { - - @UriParam +public class KinesisClientIAMOptimizedImpl implements KinesisInternalClient { + private static final Logger LOG = LoggerFactory.getLogger(KinesisClientIAMOptimizedImpl.class); private Kinesis2Configuration configuration; - private KinesisClient kinesisClient; - - public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) { - super(uri, component); + /** + * Constructor that uses the config file. + */ + public KinesisClientIAMOptimizedImpl(Kinesis2Configuration configuration) { + LOG.trace("Creating an AWS Kinesis client for an ec2 instance with IAM temporary credentials (normal for ec2s)."); this.configuration = configuration; } + /** + * Getting the Kinesis client that is used. + * + * @return Amazon Kinesis Client. + */ @Override - protected void doStart() throws Exception { - super.doStart(); - if (!configuration.isCborEnabled()) { - System.setProperty(CBOR_ENABLED.property(), "false"); - } - kinesisClient = configuration.getAmazonKinesisClient() != null - ? configuration.getAmazonKinesisClient() : createKinesisClient(); - - if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) - && configuration.getSequenceNumber().isEmpty()) { - throw new IllegalArgumentException( - "Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); - } - } - - @Override - public void doStop() throws Exception { - if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) { - if (kinesisClient != null) { - kinesisClient.close(); - } - } - if (!configuration.isCborEnabled()) { - System.clearProperty(CBOR_ENABLED.property()); - } - super.doStop(); - } - - @Override - public Producer createProducer() throws Exception { - return new Kinesis2Producer(this); - } - - @Override - public Consumer createConsumer(Processor processor) throws Exception { - final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor); - consumer.setSchedulerProperties(getSchedulerProperties()); - configureConsumer(consumer); - return consumer; - } - - public KinesisClient getClient() { - return kinesisClient; - } - - public Kinesis2Configuration getConfiguration() { - return configuration; - } - - KinesisClient createKinesisClient() { + public KinesisClient getKinesisClient() { KinesisClient client = null; KinesisClientBuilder clientBuilder = KinesisClient.builder(); ProxyConfiguration.Builder proxyConfig = null; @@ -117,18 +66,18 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { proxyConfig = ProxyConfiguration.builder(); URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); + + configuration.getProxyPort()); proxyConfig.endpoint(proxyEndpoint); httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); isClientConfigFound = true; } if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { - AwsBasicCredentials cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey()); + DefaultCredentialsProvider cred = DefaultCredentialsProvider.create(); if (isClientConfigFound) { clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder) - .credentialsProvider(StaticCredentialsProvider.create(cred)); + .credentialsProvider(cred); } else { - clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred)); + clientBuilder = clientBuilder.credentialsProvider(cred); } } else { if (!isClientConfigFound) { diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java similarity index 57% copy from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java copy to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java index ebe2e31..3d1b055 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java @@ -14,18 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.aws2.kinesis; +package org.apache.camel.component.aws2.kinesis.client.impl; -import java.net.URI; - -import org.apache.camel.Category; -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.spi.UriEndpoint; -import org.apache.camel.spi.UriParam; -import org.apache.camel.support.ScheduledPollEndpoint; +import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; +import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient; import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.http.SdkHttpClient; @@ -35,80 +30,33 @@ import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.utils.AttributeMap; -import static software.amazon.awssdk.core.SdkSystemSetting.CBOR_ENABLED; +import java.net.URI; /** - * Consume and produce records from and to AWS Kinesis Streams using AWS SDK version 2.x. + * Manage an AWS Kinesis client for all users to use. This implementation is for local instances to use a static and solid + * credential set. */ -@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis", title = "AWS 2 Kinesis", syntax = "aws2-kinesis:streamName", - category = { Category.CLOUD, Category.MESSAGING }) -public class Kinesis2Endpoint extends ScheduledPollEndpoint { - - @UriParam +public class KinesisClientStandardImpl implements KinesisInternalClient { + private static final Logger LOG = LoggerFactory.getLogger(KinesisClientStandardImpl.class); private Kinesis2Configuration configuration; - private KinesisClient kinesisClient; - - public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) { - super(uri, component); + /** + * Constructor that uses the config file. + */ + public KinesisClientStandardImpl(Kinesis2Configuration configuration) { + LOG.trace("Creating an AWS Kinesis manager using static credentials."); this.configuration = configuration; } + /** + * Getting the Kinesis client that is used. + * + * @return Amazon Kinesis Client. + */ @Override - protected void doStart() throws Exception { - super.doStart(); - if (!configuration.isCborEnabled()) { - System.setProperty(CBOR_ENABLED.property(), "false"); - } - kinesisClient = configuration.getAmazonKinesisClient() != null - ? configuration.getAmazonKinesisClient() : createKinesisClient(); - - if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) - && configuration.getSequenceNumber().isEmpty()) { - throw new IllegalArgumentException( - "Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); - } - } - - @Override - public void doStop() throws Exception { - if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) { - if (kinesisClient != null) { - kinesisClient.close(); - } - } - if (!configuration.isCborEnabled()) { - System.clearProperty(CBOR_ENABLED.property()); - } - super.doStop(); - } - - @Override - public Producer createProducer() throws Exception { - return new Kinesis2Producer(this); - } - - @Override - public Consumer createConsumer(Processor processor) throws Exception { - final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor); - consumer.setSchedulerProperties(getSchedulerProperties()); - configureConsumer(consumer); - return consumer; - } - - public KinesisClient getClient() { - return kinesisClient; - } - - public Kinesis2Configuration getConfiguration() { - return configuration; - } - - KinesisClient createKinesisClient() { + public KinesisClient getKinesisClient() { KinesisClient client = null; KinesisClientBuilder clientBuilder = KinesisClient.builder(); ProxyConfiguration.Builder proxyConfig = null; @@ -117,7 +65,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { proxyConfig = ProxyConfiguration.builder(); URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + "://" + configuration.getProxyHost() + ":" - + configuration.getProxyPort()); + + configuration.getProxyPort()); proxyConfig.endpoint(proxyEndpoint); httpClientBuilder = ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build()); isClientConfigFound = true;