This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cad8ccf506f75716a9159b2976cf02dd051080bb
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Wed Apr 7 13:17:46 2021 +0200

    CAMEL-16465 - Camel-AWS: Add useDefaultCredentialProvider option to all the 
components - Kinesis component
---
 .../aws2/kinesis/Kinesis2Configuration.java        |  11 +++
 .../component/aws2/kinesis/Kinesis2Endpoint.java   |   5 +-
 .../aws2/kinesis/client/KinesisClientFactory.java  |  41 ++++++++
 .../aws2/kinesis/client/KinesisInternalClient.java |  16 ++++
 .../impl/KinesisClientIAMOptimizedImpl.java}       | 103 ++++++---------------
 .../impl/KinesisClientStandardImpl.java}           |  96 +++++--------------
 6 files changed, 119 insertions(+), 153 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 3f80cd4..ec89c1e 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -77,6 +77,9 @@ public class Kinesis2Configuration implements Cloneable {
     @UriParam(label = "common",
               description = "Set the overriding uri endpoint. This option 
needs to be used in combination with overrideEndpoint option")
     private String uriEndpointOverride;
+    @UriParam(label = "common", defaultValue = "false", description = "Set 
whether the Kinesis client should expect to load credentials through a default 
credentials provider or to expect " +
+            "static credentials to be passed in.")
+    private boolean useDefaultCredentialsProvider;
 
     public KinesisClient getAmazonKinesisClient() {
         return amazonKinesisClient;
@@ -214,6 +217,14 @@ public class Kinesis2Configuration implements Cloneable {
         this.uriEndpointOverride = uriEndpointOverride;
     }
 
+    public boolean isUseDefaultCredentialsProvider() {
+        return useDefaultCredentialsProvider;
+    }
+
+    public void setUseDefaultCredentialsProvider(boolean 
useDefaultCredentialsProvider) {
+        this.useDefaultCredentialsProvider = useDefaultCredentialsProvider;
+    }
+
     // *************************************************
     //
     // *************************************************
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index ebe2e31..d0f819c 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -22,6 +22,7 @@ import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
@@ -64,8 +65,8 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
             System.setProperty(CBOR_ENABLED.property(), "false");
         }
         kinesisClient = configuration.getAmazonKinesisClient() != null
-                ? configuration.getAmazonKinesisClient() : 
createKinesisClient();
-
+                ? configuration.getAmazonKinesisClient() : 
KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+        
         if 
((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
                 || 
configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
                 && configuration.getSequenceNumber().isEmpty()) {
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
new file mode 100644
index 0000000..7c8f1ef
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import 
org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMOptimizedImpl;
+import 
org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientStandardImpl;
+
+/**
+ * Factory class to return the correct type of AWS Kinesis client.
+ */
+public final class KinesisClientFactory {
+
+    private KinesisClientFactory() {
+    }
+
+    /**
+     * Return the correct aws Kinesis client (based on remote vs local).
+     * 
+     * @param  configuration configuration
+     * @return               KinesisClient
+     */
+    public static KinesisInternalClient getKinesisClient(Kinesis2Configuration 
configuration) {
+        return configuration.isUseDefaultCredentialsProvider()
+                ? new KinesisClientIAMOptimizedImpl(configuration) : new 
KinesisClientStandardImpl(configuration);
+    }
+}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java
new file mode 100644
index 0000000..849ef56
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisInternalClient.java
@@ -0,0 +1,16 @@
+package org.apache.camel.component.aws2.kinesis.client;
+
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+/**
+ * Manage the required actions of a Kinesis client for either local or remote.
+ */
+public interface KinesisInternalClient {
+
+    /**
+     * Returns a Kinesis client after a factory method determines which one to 
return.
+     *
+     * @return KinesisClient client
+     */
+    KinesisClient getKinesisClient();
+}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java
similarity index 53%
copy from 
components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
copy to 
components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java
index ebe2e31..1ebde1f 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientIAMOptimizedImpl.java
@@ -14,19 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.aws2.kinesis;
+package org.apache.camel.component.aws2.kinesis.client.impl;
 
-import java.net.URI;
-
-import org.apache.camel.Category;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.http.SdkHttpClient;
 import software.amazon.awssdk.http.SdkHttpConfigurationOption;
@@ -35,80 +31,33 @@ import 
software.amazon.awssdk.http.apache.ProxyConfiguration;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 import software.amazon.awssdk.utils.AttributeMap;
 
-import static software.amazon.awssdk.core.SdkSystemSetting.CBOR_ENABLED;
+import java.net.URI;
 
 /**
- * Consume and produce records from and to AWS Kinesis Streams using AWS SDK 
version 2.x.
+ * Manage an AWS Kinesis client for all users to use (enabling temporary 
creds). This implementation is for remote instances
+ * to manage the credentials on their own (eliminating credential rotations)
  */
-@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis", title = "AWS 2 
Kinesis", syntax = "aws2-kinesis:streamName",
-             category = { Category.CLOUD, Category.MESSAGING })
-public class Kinesis2Endpoint extends ScheduledPollEndpoint {
-
-    @UriParam
+public class KinesisClientIAMOptimizedImpl implements KinesisInternalClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisClientIAMOptimizedImpl.class);
     private Kinesis2Configuration configuration;
 
-    private KinesisClient kinesisClient;
-
-    public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, 
Kinesis2Component component) {
-        super(uri, component);
+    /**
+     * Constructor that uses the config file.
+     */
+    public KinesisClientIAMOptimizedImpl(Kinesis2Configuration configuration) {
+        LOG.trace("Creating an AWS Kinesis client for an ec2 instance with IAM 
temporary credentials (normal for ec2s).");
         this.configuration = configuration;
     }
 
+    /**
+     * Getting the Kinesis client that is used.
+     *
+     * @return Amazon Kinesis Client.
+     */
     @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-        if (!configuration.isCborEnabled()) {
-            System.setProperty(CBOR_ENABLED.property(), "false");
-        }
-        kinesisClient = configuration.getAmazonKinesisClient() != null
-                ? configuration.getAmazonKinesisClient() : 
createKinesisClient();
-
-        if 
((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                || 
configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
-                && configuration.getSequenceNumber().isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Sequence Number must be specified with iterator Types 
AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
-        }
-    }
-
-    @Override
-    public void doStop() throws Exception {
-        if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) {
-            if (kinesisClient != null) {
-                kinesisClient.close();
-            }
-        }
-        if (!configuration.isCborEnabled()) {
-            System.clearProperty(CBOR_ENABLED.property());
-        }
-        super.doStop();
-    }
-
-    @Override
-    public Producer createProducer() throws Exception {
-        return new Kinesis2Producer(this);
-    }
-
-    @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        final Kinesis2Consumer consumer = new Kinesis2Consumer(this, 
processor);
-        consumer.setSchedulerProperties(getSchedulerProperties());
-        configureConsumer(consumer);
-        return consumer;
-    }
-
-    public KinesisClient getClient() {
-        return kinesisClient;
-    }
-
-    public Kinesis2Configuration getConfiguration() {
-        return configuration;
-    }
-
-    KinesisClient createKinesisClient() {
+    public KinesisClient getKinesisClient() {
         KinesisClient client = null;
         KinesisClientBuilder clientBuilder = KinesisClient.builder();
         ProxyConfiguration.Builder proxyConfig = null;
@@ -117,18 +66,18 @@ public class Kinesis2Endpoint extends 
ScheduledPollEndpoint {
         if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
             proxyConfig = ProxyConfiguration.builder();
             URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
+                    + configuration.getProxyPort());
             proxyConfig.endpoint(proxyEndpoint);
             httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
             isClientConfigFound = true;
         }
         if (configuration.getAccessKey() != null && 
configuration.getSecretKey() != null) {
-            AwsBasicCredentials cred = 
AwsBasicCredentials.create(configuration.getAccessKey(), 
configuration.getSecretKey());
+            DefaultCredentialsProvider cred = 
DefaultCredentialsProvider.create();
             if (isClientConfigFound) {
                 clientBuilder = 
clientBuilder.httpClientBuilder(httpClientBuilder)
-                        
.credentialsProvider(StaticCredentialsProvider.create(cred));
+                        .credentialsProvider(cred);
             } else {
-                clientBuilder = 
clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
+                clientBuilder = clientBuilder.credentialsProvider(cred);
             }
         } else {
             if (!isClientConfigFound) {
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java
similarity index 57%
copy from 
components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
copy to 
components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java
index ebe2e31..3d1b055 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisClientStandardImpl.java
@@ -14,18 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.aws2.kinesis;
+package org.apache.camel.component.aws2.kinesis.client.impl;
 
-import java.net.URI;
-
-import org.apache.camel.Category;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.http.SdkHttpClient;
@@ -35,80 +30,33 @@ import 
software.amazon.awssdk.http.apache.ProxyConfiguration;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 import software.amazon.awssdk.utils.AttributeMap;
 
-import static software.amazon.awssdk.core.SdkSystemSetting.CBOR_ENABLED;
+import java.net.URI;
 
 /**
- * Consume and produce records from and to AWS Kinesis Streams using AWS SDK 
version 2.x.
+ * Manage an AWS Kinesis client for all users to use. This implementation is 
for local instances to use a static and solid
+ * credential set.
  */
-@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis", title = "AWS 2 
Kinesis", syntax = "aws2-kinesis:streamName",
-             category = { Category.CLOUD, Category.MESSAGING })
-public class Kinesis2Endpoint extends ScheduledPollEndpoint {
-
-    @UriParam
+public class KinesisClientStandardImpl implements KinesisInternalClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisClientStandardImpl.class);
     private Kinesis2Configuration configuration;
 
-    private KinesisClient kinesisClient;
-
-    public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, 
Kinesis2Component component) {
-        super(uri, component);
+    /**
+     * Constructor that uses the config file.
+     */
+    public KinesisClientStandardImpl(Kinesis2Configuration configuration) {
+        LOG.trace("Creating an AWS Kinesis manager using static credentials.");
         this.configuration = configuration;
     }
 
+    /**
+     * Getting the Kinesis client that is used.
+     * 
+     * @return Amazon Kinesis Client.
+     */
     @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-        if (!configuration.isCborEnabled()) {
-            System.setProperty(CBOR_ENABLED.property(), "false");
-        }
-        kinesisClient = configuration.getAmazonKinesisClient() != null
-                ? configuration.getAmazonKinesisClient() : 
createKinesisClient();
-
-        if 
((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                || 
configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
-                && configuration.getSequenceNumber().isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Sequence Number must be specified with iterator Types 
AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
-        }
-    }
-
-    @Override
-    public void doStop() throws Exception {
-        if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) {
-            if (kinesisClient != null) {
-                kinesisClient.close();
-            }
-        }
-        if (!configuration.isCborEnabled()) {
-            System.clearProperty(CBOR_ENABLED.property());
-        }
-        super.doStop();
-    }
-
-    @Override
-    public Producer createProducer() throws Exception {
-        return new Kinesis2Producer(this);
-    }
-
-    @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        final Kinesis2Consumer consumer = new Kinesis2Consumer(this, 
processor);
-        consumer.setSchedulerProperties(getSchedulerProperties());
-        configureConsumer(consumer);
-        return consumer;
-    }
-
-    public KinesisClient getClient() {
-        return kinesisClient;
-    }
-
-    public Kinesis2Configuration getConfiguration() {
-        return configuration;
-    }
-
-    KinesisClient createKinesisClient() {
+    public KinesisClient getKinesisClient() {
         KinesisClient client = null;
         KinesisClientBuilder clientBuilder = KinesisClient.builder();
         ProxyConfiguration.Builder proxyConfig = null;
@@ -117,7 +65,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
         if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && 
ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
             proxyConfig = ProxyConfiguration.builder();
             URI proxyEndpoint = URI.create(configuration.getProxyProtocol() + 
"://" + configuration.getProxyHost() + ":"
-                                           + configuration.getProxyPort());
+                    + configuration.getProxyPort());
             proxyConfig.endpoint(proxyEndpoint);
             httpClientBuilder = 
ApacheHttpClient.builder().proxyConfiguration(proxyConfig.build());
             isClientConfigFound = true;

Reply via email to