This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 3701f2d CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level 3701f2d is described below commit 3701f2d21052799a5d907dfe3776e1c014062ff0 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Jan 22 09:57:03 2018 +0100 CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level --- .../src/main/docs/aws-kinesis-component.adoc | 14 +- .../component/aws/kinesis/KinesisComponent.java | 69 ++++++++- .../aws/kinesis/KinesisConfiguration.java | 16 +- .../kinesis/KinesisComponentConfigurationTest.java | 28 ++++ .../springboot/KinesisComponentConfiguration.java | 165 +++++++++++++++++++++ 5 files changed, 289 insertions(+), 3 deletions(-) diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc index 559a89c..a570cb2 100644 --- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc +++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc @@ -26,7 +26,19 @@ The stream needs to be created prior to it being used. + // component options: START -The AWS Kinesis component has no options. +The AWS Kinesis component supports 5 options which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *configuration* (advanced) | The AWS S3 default configuration | | KinesisConfiguration +| *accessKey* (common) | Amazon AWS Access Key | | String +| *secretKey* (common) | Amazon AWS Secret Key | | String +| *region* (common) | Amazon AWS Region | | String +| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|=== // component options: END diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java index 34789af..e1a3385 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java @@ -21,24 +21,91 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.spi.Metadata; +import org.apache.camel.util.ObjectHelper; public class KinesisComponent extends DefaultComponent { + @Metadata + private String accessKey; + @Metadata + private String secretKey; + @Metadata + private String region; + @Metadata(label = "advanced") + private KinesisConfiguration configuration; + public KinesisComponent() { this(null); } public KinesisComponent(CamelContext context) { super(context); + + this.configuration = new KinesisConfiguration(); } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - KinesisConfiguration configuration = new KinesisConfiguration(); + KinesisConfiguration configuration = this.configuration.copy(); configuration.setStreamName(remaining); setProperties(configuration, parameters); + if (ObjectHelper.isEmpty(configuration.getAccessKey())) { + setAccessKey(accessKey); + } + if (ObjectHelper.isEmpty(configuration.getSecretKey())) { + setSecretKey(secretKey); + } + if (ObjectHelper.isEmpty(configuration.getRegion())) { + setRegion(region); + } + KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, this); return endpoint; } + + public KinesisConfiguration getConfiguration() { + return configuration; + } + + /** + * The AWS S3 default configuration + */ + public void setConfiguration(KinesisConfiguration configuration) { + this.configuration = configuration; + } + + public String getAccessKey() { + return configuration.getAccessKey(); + } + + /** + * Amazon AWS Access Key + */ + public void setAccessKey(String accessKey) { + configuration.setAccessKey(accessKey); + } + + public String getSecretKey() { + return configuration.getSecretKey(); + } + + /** + * Amazon AWS Secret Key + */ + public void setSecretKey(String secretKey) { + configuration.setSecretKey(secretKey); + } + + public String getRegion() { + return configuration.getRegion(); + } + + /** + * Amazon AWS Region + */ + public void setRegion(String region) { + configuration.setRegion(region); + } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java index a6d38e3..820492b 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java @@ -19,12 +19,14 @@ package org.apache.camel.component.aws.kinesis; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; + @UriParams -public class KinesisConfiguration { +public class KinesisConfiguration implements Cloneable { @UriPath(description = "Name of the stream") @Metadata(required = "true") @@ -150,4 +152,16 @@ public class KinesisConfiguration { public void setProxyPort(Integer proxyPort) { this.proxyPort = proxyPort; } + + // ************************************************* + // + // ************************************************* + + public KinesisConfiguration copy() { + try { + return (KinesisConfiguration)super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } } diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java index 0e45169..20bc912 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.aws.kinesis; +import com.amazonaws.regions.Regions; + import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; @@ -31,4 +33,30 @@ public class KinesisComponentConfigurationTest extends CamelTestSupport { assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); } + @Test + public void createEndpointWithComponentElements() throws Exception { + KinesisComponent component = new KinesisComponent(context); + component.setAccessKey("XXX"); + component.setSecretKey("YYY"); + KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name"); + + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); + assertEquals("XXX", endpoint.getConfiguration().getAccessKey()); + assertEquals("YYY", endpoint.getConfiguration().getSecretKey()); + } + + @Test + public void createEndpointWithComponentAndEndpointElements() throws Exception { + KinesisComponent component = new KinesisComponent(context); + component.setAccessKey("XXX"); + component.setSecretKey("YYY"); + component.setRegion(Regions.US_WEST_1.toString()); + KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1"); + + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); + assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); + assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); + assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); + } + } \ No newline at end of file diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java index 464ab9f..197461b 100644 --- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java @@ -17,6 +17,10 @@ package org.apache.camel.component.aws.kinesis.springboot; import javax.annotation.Generated; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.camel.component.aws.kinesis.KinesisComponent; +import org.apache.camel.component.aws.kinesis.KinesisShardClosedStrategyEnum; import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -33,12 +37,61 @@ public class KinesisComponentConfiguration ComponentConfigurationPropertiesCommon { /** + * The AWS S3 default configuration + */ + private KinesisConfigurationNestedConfiguration configuration; + /** + * Amazon AWS Access Key + */ + private String accessKey; + /** + * Amazon AWS Secret Key + */ + private String secretKey; + /** + * Amazon AWS Region + */ + private String region; + /** * Whether the component should resolve property placeholders on itself when * starting. Only properties which are of String type can use property * placeholders. */ private Boolean resolvePropertyPlaceholders = true; + public KinesisConfigurationNestedConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration( + KinesisConfigurationNestedConfiguration configuration) { + this.configuration = configuration; + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + public Boolean getResolvePropertyPlaceholders() { return resolvePropertyPlaceholders; } @@ -47,4 +100,116 @@ public class KinesisComponentConfiguration Boolean resolvePropertyPlaceholders) { this.resolvePropertyPlaceholders = resolvePropertyPlaceholders; } + + public static class KinesisConfigurationNestedConfiguration { + public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.aws.kinesis.KinesisConfiguration.class; + private AmazonKinesis amazonKinesisClient; + private Integer maxResultsPerRequest = 1; + private String streamName; + private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; + private String shardId; + private String sequenceNumber; + private KinesisShardClosedStrategyEnum shardClosed = KinesisShardClosedStrategyEnum.ignore; + private String accessKey; + private String secretKey; + private String region; + private String proxyHost; + private Integer proxyPort; + + public AmazonKinesis getAmazonKinesisClient() { + return amazonKinesisClient; + } + + public void setAmazonKinesisClient(AmazonKinesis amazonKinesisClient) { + this.amazonKinesisClient = amazonKinesisClient; + } + + public Integer getMaxResultsPerRequest() { + return maxResultsPerRequest; + } + + public void setMaxResultsPerRequest(Integer maxResultsPerRequest) { + this.maxResultsPerRequest = maxResultsPerRequest; + } + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public ShardIteratorType getIteratorType() { + return iteratorType; + } + + public void setIteratorType(ShardIteratorType iteratorType) { + this.iteratorType = iteratorType; + } + + public String getShardId() { + return shardId; + } + + public void setShardId(String shardId) { + this.shardId = shardId; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public KinesisShardClosedStrategyEnum getShardClosed() { + return shardClosed; + } + + public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) { + this.shardClosed = shardClosed; + } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Integer getProxyPort() { + return proxyPort; + } + + public void setProxyPort(Integer proxyPort) { + this.proxyPort = proxyPort; + } + } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact acosent...@apache.org.