This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4a16e066b846fc2bbfbb964e425703cb9c8501f8 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Jan 22 11:53:26 2018 +0100 CAMEL-12175 - Camel-AWS Kinesis Firehose: Expose options to avoid a required client in the registry --- .../aws/firehose/KinesisFirehoseConfiguration.java | 51 ++++++++++++++++++++- .../aws/firehose/KinesisFirehoseEndpoint.java | 53 +++++++++++++++++++++- .../aws/firehose/KinesisFirehoseEndpointTest.java | 14 ++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java index 7c6201f..699a2ca 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java @@ -29,9 +29,18 @@ public class KinesisFirehoseConfiguration { @UriPath(description = "Name of the stream") @Metadata(required = "true") private String streamName; + @UriParam(label = "security", secret = true, description = "Amazon AWS Access Key") + private String accessKey; + @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key") + private String secretKey; + @UriParam(description = "The region in which Kinesis client needs to work") + private String region; @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint") - @Metadata(required = "true") private AmazonKinesisFirehose amazonKinesisFirehoseClient; + @UriParam(description = "To define a proxy host when instantiating the DDBStreams client") + private String proxyHost; + @UriParam(description = "To define a proxy port when instantiating the DDBStreams client") + private Integer proxyPort; public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) { this.amazonKinesisFirehoseClient = client; @@ -48,4 +57,44 @@ public class KinesisFirehoseConfiguration { public String getStreamName() { return streamName; } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Integer getProxyPort() { + return proxyPort; + } + + public void setProxyPort(Integer proxyPort) { + this.proxyPort = proxyPort; + } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java index cdeb4d4..3f5fafe 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java @@ -16,13 +16,21 @@ */ package org.apache.camel.component.aws.firehose; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClientBuilder; + import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.camel.util.ObjectHelper; /** * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams. @@ -33,6 +41,8 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint { @UriParam private KinesisFirehoseConfiguration configuration; + + private AmazonKinesisFirehose kinesisFirehoseClient; public KinesisFirehoseEndpoint(String uri, KinesisFirehoseConfiguration configuration, KinesisFirehoseComponent component) { super(uri, component); @@ -48,14 +58,55 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint { public Consumer createConsumer(Processor processor) throws Exception { throw new UnsupportedOperationException("You cannot consume messages from this endpoint"); } + + @Override + protected void doStart() throws Exception { + super.doStart(); + kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient() + : createKinesisFirehoseClient(); + + } @Override public boolean isSingleton() { return true; } + + AmazonKinesisFirehose createKinesisFirehoseClient() { + AmazonKinesisFirehose client = null; + ClientConfiguration clientConfiguration = null; + AmazonKinesisFirehoseClientBuilder clientBuilder = null; + boolean isClientConfigFound = false; + if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { + clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProxyHost(configuration.getProxyHost()); + clientConfiguration.setProxyPort(configuration.getProxyPort()); + isClientConfigFound = true; + } + if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { + AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey()); + AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); + if (isClientConfigFound) { + clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider); + } else { + clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(credentialsProvider); + } + } else { + if (isClientConfigFound) { + clientBuilder = AmazonKinesisFirehoseClientBuilder.standard(); + } else { + clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withClientConfiguration(clientConfiguration); + } + } + if (ObjectHelper.isNotEmpty(configuration.getRegion())) { + clientBuilder = clientBuilder.withRegion(configuration.getRegion()); + } + client = clientBuilder.build(); + return client; + } public AmazonKinesisFirehose getClient() { - return configuration.getAmazonKinesisFirehoseClient(); + return kinesisFirehoseClient; } public KinesisFirehoseConfiguration getConfiguration() { diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java index ac8cdf1..391bb5d 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.aws.firehose; +import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultCamelContext; @@ -49,8 +50,21 @@ public class KinesisFirehoseEndpointTest { KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name" + "?amazonKinesisFirehoseClient=#firehoseClient" ); + endpoint.start(); assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); } + + @Test + public void allClientCreationParams() throws Exception { + KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name" + + "?accessKey=xxx&secretKey=yyy®ion=us-east-1" + ); + + assertThat(endpoint.getConfiguration().getRegion(), is(Regions.US_EAST_1.getName())); + assertThat(endpoint.getConfiguration().getAccessKey(), is("xxx")); + assertThat(endpoint.getConfiguration().getSecretKey(), is("yyy")); + assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); + } } -- To stop receiving notification emails like this one, please contact acosent...@apache.org.