This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 80c410097a3f79844d9c9b678b4ca54494dcb2c6 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Feb 28 12:20:20 2019 +0100 CAMEL-13165 - Camel-AWS: Create an AWS MSK component --- components/camel-aws-msk/bin/pom.xml | 81 +++++++++ .../bin/src/main/docs/aws-kms-component.adoc | 182 +++++++++++++++++++++ .../camel/component/aws/kms/MSKComponent.class | Bin 0 -> 6580 bytes .../aws/kms/MSKComponentVerifierExtension.class | Bin 0 -> 5686 bytes .../camel/component/aws/kms/MSKConfiguration.class | Bin 0 -> 5398 bytes .../camel/component/aws/kms/MSKConstants.class | Bin 0 -> 799 bytes .../camel/component/aws/kms/MSKEndpoint.class | Bin 0 -> 8731 bytes .../camel/component/aws/kms/MSKOperations.class | Bin 0 -> 1146 bytes .../camel/component/aws/kms/MSKProducer.class | Bin 0 -> 5348 bytes .../component/aws/kms/AmazonMKSClientMock.class | Bin 0 -> 775 bytes .../component/aws/kms/KMSProducerSpringTest.class | Bin 0 -> 9622 bytes .../camel/component/aws/kms/KMSProducerTest.class | Bin 0 -> 9526 bytes .../aws/kms/MSKComponentConfigurationTest.class | Bin 0 -> 2491 bytes .../kms/MSKComponentVerifierExtensionTest.class | Bin 0 -> 3883 bytes .../bin/src/test/resources/log4j2.properties | 28 ++++ .../aws/kms/KMSComponentSpringTest-context.xml | 60 +++++++ components/camel-aws-msk/pom.xml | 81 +++++++++ .../src/main/docs/aws-kms-component.adoc | 182 +++++++++++++++++++++ .../camel/component/aws/msk/MSKComponent.java | 121 ++++++++++++++ .../aws/msk/MSKComponentVerifierExtension.java | 89 ++++++++++ .../camel/component/aws/msk/MSKConfiguration.java | 137 ++++++++++++++++ .../camel/component/aws/msk/MSKConstants.java | 30 ++++ .../camel/component/aws/msk/MSKEndpoint.java | 121 ++++++++++++++ .../camel/component/aws/msk/MSKOperations.java | 24 +++ .../camel/component/aws/msk/MSKProducer.java | 170 +++++++++++++++++++ .../component/aws/msk/AmazonMSKClientMock.java | 65 ++++++++ .../aws/msk/MSKComponentConfigurationTest.java | 53 ++++++ .../aws/msk/MSKComponentVerifierExtensionTest.java | 74 +++++++++ .../component/aws/msk/MSKProducerSpringTest.java | 104 ++++++++++++ .../camel/component/aws/msk/MSKProducerTest.java | 129 +++++++++++++++ .../src/test/resources/log4j2.properties | 28 ++++ .../aws/msk/MSKComponentSpringTest-context.xml | 47 ++++++ platforms/spring-boot/components-starter/pom.xml | 1 + 33 files changed, 1807 insertions(+) diff --git a/components/camel-aws-msk/bin/pom.xml b/components/camel-aws-msk/bin/pom.xml new file mode 100644 index 0000000..c29ca1d --- /dev/null +++ b/components/camel-aws-msk/bin/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.0.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-aws-msk</artifactId> + <packaging>jar</packaging> + + <name>Camel :: AWS MSK</name> + <description>A Camel Amazon MSK Web Service Component</description> + + <properties> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-kafka</artifactId> + <version>${aws-java-sdk-version}</version> + </dependency> + + <!-- for testing --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc b/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc new file mode 100644 index 0000000..77f8a26 --- /dev/null +++ b/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc @@ -0,0 +1,182 @@ +[[aws-kms-component]] +== AWS KMS Component + +*Available as of Camel version 2.21* + +The KMS component supports create, run, start, stop and terminate +https://aws.amazon.com/it/kms/[AWS KMS] instances. + +Prerequisites + +You must have a valid Amazon Web Services developer account, and be +signed up to use Amazon KMS. More information are available at +https://aws.amazon.com/it/mq/[Amazon KMS]. + +### URI Format + +[source,java] +------------------------- +aws-kms://label[?options] +------------------------- + +You can append query options to the URI in the following format, +?options=value&option2=value&... + +### URI Options + + +// component options: START +The AWS KMS component supports 5 options, which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *configuration* (advanced) | The AWS KMS default configuration | | KMSConfiguration +| *accessKey* (producer) | Amazon AWS Access Key | | String +| *secretKey* (producer) | Amazon AWS Secret Key | | String +| *region* (producer) | The region in which KMS client needs to work | | String +| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|=== +// component options: END + + + + +// endpoint options: START +The AWS KMS endpoint is configured using URI syntax: + +---- +aws-kms:label +---- + +with the following path and query parameters: + +==== Path Parameters (1 parameters): + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *label* | *Required* Logical name | | String +|=== + + +==== Query Parameters (8 parameters): + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *accessKey* (producer) | Amazon AWS Access Key | | String +| *kmsClient* (producer) | To use a existing configured AWS KMS as client | | AWSKMS +| *operation* (producer) | *Required* The operation to perform | | KMSOperations +| *proxyHost* (producer) | To define a proxy host when instantiating the KMS client | | String +| *proxyPort* (producer) | To define a proxy port when instantiating the KMS client | | Integer +| *region* (producer) | The region in which KMS client needs to work | | String +| *secretKey* (producer) | Amazon AWS Secret Key | | String +| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean +|=== +// endpoint options: END +// spring-boot-auto-configure options: START +=== Spring Boot Auto-Configuration + +When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration: + +[source,xml] +---- +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws-kms-starter</artifactId> + <version>x.x.x</version> + <!-- use the same version as your Camel core version --> +</dependency> +---- + + +The component supports 12 options, which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *camel.component.aws-kms.access-key* | Amazon AWS Access Key | | String +| *camel.component.aws-kms.configuration.access-key* | Amazon AWS Access Key | | String +| *camel.component.aws-kms.configuration.kms-client* | To use a existing configured AWS KMS as client | | AWSKMS +| *camel.component.aws-kms.configuration.operation* | The operation to perform | | KMSOperations +| *camel.component.aws-kms.configuration.proxy-host* | To define a proxy host when instantiating the KMS client | | String +| *camel.component.aws-kms.configuration.proxy-port* | To define a proxy port when instantiating the KMS client | | Integer +| *camel.component.aws-kms.configuration.region* | The region in which KMS client needs to work | | String +| *camel.component.aws-kms.configuration.secret-key* | Amazon AWS Secret Key | | String +| *camel.component.aws-kms.enabled* | Whether to enable auto configuration of the aws-kms component. This is enabled by default. | | Boolean +| *camel.component.aws-kms.region* | The region in which KMS client needs to work | | String +| *camel.component.aws-kms.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean +| *camel.component.aws-kms.secret-key* | Amazon AWS Secret Key | | String +|=== +// spring-boot-auto-configure options: END + + + + +Required KMS component options + +You have to provide the amazonKmsClient in the +Registry or your accessKey and secretKey to access +the https://aws.amazon.com/it/kms/[Amazon KMS] service. + +### Usage + +#### Message headers evaluated by the MQ producer + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type |Description + +|`CamelAwsKMSLimit` |`Integer` |The limit number of keys to return while performing a listKeys operation + +|`CamelAwsKMSOperation` |`String` |The operation we want to perform + +|`CamelAwsKMSDescription` |`String` |A key description to use while performing a createKey operation + +|`CamelAwsKMSKeyId` |`String` |The key Id +|======================================================================= + +#### KMS Producer operations + +Camel-AWS KMS component provides the following operation on the producer side: + +- listKeys +- createKey +- disableKey +- scheduleKeyDeletion +- describeKey +- enableKey + +Dependencies + +Maven users will need to add the following dependency to their pom.xml. + +*pom.xml* + +[source,xml] +--------------------------------------- +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws-kms</artifactId> + <version>${camel-version}</version> +</dependency> +--------------------------------------- + +where `${camel-version`} must be replaced by the actual version of Camel +(2.16 or higher). + +### See Also + +* Configuring Camel +* Component +* Endpoint +* Getting Started + +* AWS Component diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class new file mode 100644 index 0000000..c7d4963 Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class differ diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class new file mode 100644 index 0000000..144ae2e Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class differ diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class new file mode 100644 index 0000000..dbf1478 Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class differ diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class new file mode 100644 index 0000000..fb77aaf Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class differ diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class new file mode 100644 index 0000000..7fd6129 Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class differ diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class new file mode 100644 index 0000000..16ec708 Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class differ diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class new file mode 100644 index 0000000..6c8b42e Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class differ diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class new file mode 100644 index 0000000..2a5517c Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class differ diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class new file mode 100644 index 0000000..5a6a3fe Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class differ diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class new file mode 100644 index 0000000..50605ab Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class differ diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class new file mode 100644 index 0000000..0e6e223 Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class differ diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class new file mode 100644 index 0000000..e161b81 Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class differ diff --git a/components/camel-aws-msk/bin/src/test/resources/log4j2.properties b/components/camel-aws-msk/bin/src/test/resources/log4j2.properties new file mode 100644 index 0000000..986f470 --- /dev/null +++ b/components/camel-aws-msk/bin/src/test/resources/log4j2.properties @@ -0,0 +1,28 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-aws-kms-test.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file diff --git a/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml b/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml new file mode 100644 index 0000000..42eccfd --- /dev/null +++ b/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:listKeys"/> + <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=listKeys"/> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:createKey"/> + <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=createKey"/> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:disableKey"/> + <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=disableKey"/> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:enableKey"/> + <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=enableKey"/> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:scheduleDelete"/> + <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=scheduleKeyDeletion"/> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:describeKey"/> + <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=describeKey"/> + <to uri="mock:result"/> + </route> + </camelContext> + + <bean id="amazonKmsClient" class="org.apache.camel.component.aws.kms.AmazonKMSClientMock"/> +</beans> \ No newline at end of file diff --git a/components/camel-aws-msk/pom.xml b/components/camel-aws-msk/pom.xml new file mode 100644 index 0000000..c29ca1d --- /dev/null +++ b/components/camel-aws-msk/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.0.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-aws-msk</artifactId> + <packaging>jar</packaging> + + <name>Camel :: AWS MSK</name> + <description>A Camel Amazon MSK Web Service Component</description> + + <properties> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-kafka</artifactId> + <version>${aws-java-sdk-version}</version> + </dependency> + + <!-- for testing --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc b/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc new file mode 100644 index 0000000..77f8a26 --- /dev/null +++ b/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc @@ -0,0 +1,182 @@ +[[aws-kms-component]] +== AWS KMS Component + +*Available as of Camel version 2.21* + +The KMS component supports create, run, start, stop and terminate +https://aws.amazon.com/it/kms/[AWS KMS] instances. + +Prerequisites + +You must have a valid Amazon Web Services developer account, and be +signed up to use Amazon KMS. More information are available at +https://aws.amazon.com/it/mq/[Amazon KMS]. + +### URI Format + +[source,java] +------------------------- +aws-kms://label[?options] +------------------------- + +You can append query options to the URI in the following format, +?options=value&option2=value&... + +### URI Options + + +// component options: START +The AWS KMS component supports 5 options, which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *configuration* (advanced) | The AWS KMS default configuration | | KMSConfiguration +| *accessKey* (producer) | Amazon AWS Access Key | | String +| *secretKey* (producer) | Amazon AWS Secret Key | | String +| *region* (producer) | The region in which KMS client needs to work | | String +| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|=== +// component options: END + + + + +// endpoint options: START +The AWS KMS endpoint is configured using URI syntax: + +---- +aws-kms:label +---- + +with the following path and query parameters: + +==== Path Parameters (1 parameters): + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *label* | *Required* Logical name | | String +|=== + + +==== Query Parameters (8 parameters): + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *accessKey* (producer) | Amazon AWS Access Key | | String +| *kmsClient* (producer) | To use a existing configured AWS KMS as client | | AWSKMS +| *operation* (producer) | *Required* The operation to perform | | KMSOperations +| *proxyHost* (producer) | To define a proxy host when instantiating the KMS client | | String +| *proxyPort* (producer) | To define a proxy port when instantiating the KMS client | | Integer +| *region* (producer) | The region in which KMS client needs to work | | String +| *secretKey* (producer) | Amazon AWS Secret Key | | String +| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean +|=== +// endpoint options: END +// spring-boot-auto-configure options: START +=== Spring Boot Auto-Configuration + +When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration: + +[source,xml] +---- +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws-kms-starter</artifactId> + <version>x.x.x</version> + <!-- use the same version as your Camel core version --> +</dependency> +---- + + +The component supports 12 options, which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *camel.component.aws-kms.access-key* | Amazon AWS Access Key | | String +| *camel.component.aws-kms.configuration.access-key* | Amazon AWS Access Key | | String +| *camel.component.aws-kms.configuration.kms-client* | To use a existing configured AWS KMS as client | | AWSKMS +| *camel.component.aws-kms.configuration.operation* | The operation to perform | | KMSOperations +| *camel.component.aws-kms.configuration.proxy-host* | To define a proxy host when instantiating the KMS client | | String +| *camel.component.aws-kms.configuration.proxy-port* | To define a proxy port when instantiating the KMS client | | Integer +| *camel.component.aws-kms.configuration.region* | The region in which KMS client needs to work | | String +| *camel.component.aws-kms.configuration.secret-key* | Amazon AWS Secret Key | | String +| *camel.component.aws-kms.enabled* | Whether to enable auto configuration of the aws-kms component. This is enabled by default. | | Boolean +| *camel.component.aws-kms.region* | The region in which KMS client needs to work | | String +| *camel.component.aws-kms.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean +| *camel.component.aws-kms.secret-key* | Amazon AWS Secret Key | | String +|=== +// spring-boot-auto-configure options: END + + + + +Required KMS component options + +You have to provide the amazonKmsClient in the +Registry or your accessKey and secretKey to access +the https://aws.amazon.com/it/kms/[Amazon KMS] service. + +### Usage + +#### Message headers evaluated by the MQ producer + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type |Description + +|`CamelAwsKMSLimit` |`Integer` |The limit number of keys to return while performing a listKeys operation + +|`CamelAwsKMSOperation` |`String` |The operation we want to perform + +|`CamelAwsKMSDescription` |`String` |A key description to use while performing a createKey operation + +|`CamelAwsKMSKeyId` |`String` |The key Id +|======================================================================= + +#### KMS Producer operations + +Camel-AWS KMS component provides the following operation on the producer side: + +- listKeys +- createKey +- disableKey +- scheduleKeyDeletion +- describeKey +- enableKey + +Dependencies + +Maven users will need to add the following dependency to their pom.xml. + +*pom.xml* + +[source,xml] +--------------------------------------- +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws-kms</artifactId> + <version>${camel-version}</version> +</dependency> +--------------------------------------- + +where `${camel-version`} must be replaced by the actual version of Camel +(2.16 or higher). + +### See Also + +* Configuring Camel +* Component +* Endpoint +* Getting Started + +* AWS Component diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java new file mode 100644 index 0000000..5b6086a --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; + +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.annotations.Component; +import org.apache.camel.support.DefaultComponent; +import org.apache.camel.util.ObjectHelper; + +/** + * For working with Amazon KMS. + */ +@Component("aws-msk") +public class MSKComponent extends DefaultComponent { + + @Metadata + private String accessKey; + @Metadata + private String secretKey; + @Metadata + private String region; + @Metadata(label = "advanced") + private MSKConfiguration configuration; + + public MSKComponent() { + this(null); + } + + public MSKComponent(CamelContext context) { + super(context); + + this.configuration = new MSKConfiguration(); + registerExtension(new MSKComponentVerifierExtension()); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + MSKConfiguration configuration = this.configuration.copy(); + setProperties(configuration, parameters); + + if (ObjectHelper.isEmpty(configuration.getAccessKey())) { + setAccessKey(accessKey); + } + if (ObjectHelper.isEmpty(configuration.getSecretKey())) { + setSecretKey(secretKey); + } + if (ObjectHelper.isEmpty(configuration.getRegion())) { + setRegion(region); + } + if (configuration.getMskClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) { + throw new IllegalArgumentException("Amazon msk client or accessKey and secretKey must be specified"); + } + + MSKEndpoint endpoint = new MSKEndpoint(uri, this, configuration); + return endpoint; + } + + public MSKConfiguration getConfiguration() { + return configuration; + } + + /** + * The AWS KMS default configuration + */ + public void setConfiguration(MSKConfiguration configuration) { + this.configuration = configuration; + } + + public String getAccessKey() { + return configuration.getAccessKey(); + } + + /** + * Amazon AWS Access Key + */ + public void setAccessKey(String accessKey) { + configuration.setAccessKey(accessKey); + } + + public String getSecretKey() { + return configuration.getSecretKey(); + } + + /** + * Amazon AWS Secret Key + */ + public void setSecretKey(String secretKey) { + configuration.setSecretKey(secretKey); + } + + public String getRegion() { + return configuration.getRegion(); + } + + /** + * The region in which KMS client needs to work + */ + public void setRegion(String region) { + configuration.setRegion(region); + } + +} diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java new file mode 100644 index 0000000..bf1fa3b --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import java.util.Map; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kafka.AWSKafka; +import com.amazonaws.services.kafka.AWSKafkaClientBuilder; +import com.amazonaws.services.kafka.model.ListClustersRequest; + +import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension; +import org.apache.camel.component.extension.verifier.ResultBuilder; +import org.apache.camel.component.extension.verifier.ResultErrorBuilder; +import org.apache.camel.component.extension.verifier.ResultErrorHelper; + +public class MSKComponentVerifierExtension extends DefaultComponentVerifierExtension { + + public MSKComponentVerifierExtension() { + this("aws-msk"); + } + + public MSKComponentVerifierExtension(String scheme) { + super(scheme); + } + + // ********************************* + // Parameters validation + // ********************************* + + @Override + protected Result verifyParameters(Map<String, Object> parameters) { + + ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS).error(ResultErrorHelper.requiresOption("accessKey", parameters)) + .error(ResultErrorHelper.requiresOption("secretKey", parameters)).error(ResultErrorHelper.requiresOption("region", parameters)); + + // Validate using the catalog + + super.verifyParametersAgainstCatalog(builder, parameters); + + return builder.build(); + } + + // ********************************* + // Connectivity validation + // ********************************* + + @Override + protected Result verifyConnectivity(Map<String, Object> parameters) { + ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.CONNECTIVITY); + + try { + MSKConfiguration configuration = setProperties(new MSKConfiguration(), parameters); + AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey()); + AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); + ListClustersRequest request = new ListClustersRequest(); + AWSKafka client = AWSKafkaClientBuilder.standard().withCredentials(credentialsProvider).withRegion(Regions.valueOf(configuration.getRegion())).build(); + client.listClusters(request); + } catch (SdkClientException e) { + ResultErrorBuilder errorBuilder = ResultErrorBuilder.withCodeAndDescription(VerificationError.StandardCode.AUTHENTICATION, e.getMessage()) + .detail("aws_mks_exception_message", e.getMessage()).detail(VerificationError.ExceptionAttribute.EXCEPTION_CLASS, e.getClass().getName()) + .detail(VerificationError.ExceptionAttribute.EXCEPTION_INSTANCE, e); + + builder.error(errorBuilder.build()); + } catch (Exception e) { + builder.error(ResultErrorBuilder.withException(e).build()); + } + return builder.build(); + } +} diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java new file mode 100644 index 0000000..93e86bd --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +import org.apache.camel.spi.UriPath; + +import com.amazonaws.services.kafka.AWSKafka; + +@UriParams +public class MSKConfiguration implements Cloneable { + + @UriPath(description = "Logical name") + @Metadata(required = true) + private String label; + @UriParam(label = "producer") + private AWSKafka mskClient; + @UriParam(label = "producer", secret = true) + private String accessKey; + @UriParam(label = "producer", secret = true) + private String secretKey; + @UriParam(label = "producer") + @Metadata(required = true) + private MSKOperations operation; + @UriParam(label = "producer") + private String proxyHost; + @UriParam(label = "producer") + private Integer proxyPort; + @UriParam + private String region; + + public AWSKafka getMskClient() { + return mskClient; + } + + /** + * To use a existing configured AWS MSK as client + */ + public void setMskClient(AWSKafka mskClient) { + this.mskClient = mskClient; + } + + public String getAccessKey() { + return accessKey; + } + + /** + * Amazon AWS Access Key + */ + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + /** + * Amazon AWS Secret Key + */ + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public MSKOperations getOperation() { + return operation; + } + + /** + * The operation to perform + */ + public void setOperation(MSKOperations operation) { + this.operation = operation; + } + + public String getProxyHost() { + return proxyHost; + } + + /** + * To define a proxy host when instantiating the KMS client + */ + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Integer getProxyPort() { + return proxyPort; + } + + /** + * To define a proxy port when instantiating the KMS client + */ + public void setProxyPort(Integer proxyPort) { + this.proxyPort = proxyPort; + } + + public String getRegion() { + return region; + } + + /** + * The region in which KMS client needs to work + */ + public void setRegion(String region) { + this.region = region; + } + + // ************************************************* + // + // ************************************************* + + public MSKConfiguration copy() { + try { + return (MSKConfiguration)super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java new file mode 100644 index 0000000..5f2c1e5 --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +/** + * Constants used in Camel AWS MSK module + */ +public interface MSKConstants { + String OPERATION = "CamelAwsMSKOperation"; + String CLUSTERS_FILTER = "CamelAwsMSKClusterFilter"; + String CLUSTER_NAME = "CamelAwsMSKClusterName"; + String CLUSTER_ARN = "CamelAwsMSKClusterArn"; + String CLUSTER_KAFKA_VERSION = "CamelAwsMSKClusterKafkaVersion"; + String BROKER_NODES_NUMBER = "CamelAwsMSKBrokerNodesNumber"; + String BROKER_NODES_GROUP_INFO = "CamelAwsMSKBrokerNodesGroupInfo"; +} diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java new file mode 100644 index 0000000..f49ab3f --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.kafka.AWSKafka; +import com.amazonaws.services.kafka.AWSKafkaClientBuilder; + +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.support.ScheduledPollEndpoint; +import org.apache.camel.util.ObjectHelper; + +/** + * The aws-kms is used for managing Amazon KMS + */ +@UriEndpoint(firstVersion = "3.0.0", scheme = "aws-msk", title = "AWS MSK", syntax = "aws-msk:label", producerOnly = true, label = "cloud,management") +public class MSKEndpoint extends ScheduledPollEndpoint { + + private AWSKafka mskClient; + + @UriParam + private MSKConfiguration configuration; + + public MSKEndpoint(String uri, Component component, MSKConfiguration configuration) { + super(uri, component); + this.configuration = configuration; + } + + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("You cannot receive messages from this endpoint"); + } + + public Producer createProducer() throws Exception { + return new MSKProducer(this); + } + + public boolean isSingleton() { + return true; + } + + @Override + public void doStart() throws Exception { + super.doStart(); + + mskClient = configuration.getMskClient() != null ? configuration.getMskClient() : createMSKClient(); + } + + @Override + public void doStop() throws Exception { + if (ObjectHelper.isEmpty(configuration.getMskClient())) { + if (mskClient != null) { + mskClient.shutdown(); + } + } + super.doStop(); + } + + public MSKConfiguration getConfiguration() { + return configuration; + } + + public AWSKafka getMskClient() { + return mskClient; + } + + AWSKafka createMSKClient() { + AWSKafka client = null; + ClientConfiguration clientConfiguration = null; + AWSKafkaClientBuilder clientBuilder = null; + boolean isClientConfigFound = false; + if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { + clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProxyHost(configuration.getProxyHost()); + clientConfiguration.setProxyPort(configuration.getProxyPort()); + isClientConfigFound = true; + } + if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { + AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey()); + AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); + if (isClientConfigFound) { + clientBuilder = AWSKafkaClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider); + } else { + clientBuilder = AWSKafkaClientBuilder.standard().withCredentials(credentialsProvider); + } + } else { + if (isClientConfigFound) { + clientBuilder = AWSKafkaClientBuilder.standard(); + } else { + clientBuilder = AWSKafkaClientBuilder.standard().withClientConfiguration(clientConfiguration); + } + } + if (ObjectHelper.isNotEmpty(configuration.getRegion())) { + clientBuilder = clientBuilder.withRegion(configuration.getRegion()); + } + client = clientBuilder.build(); + return client; + } +} diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java new file mode 100644 index 0000000..a06b4e7 --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +public enum MSKOperations { + + listClusters, + createCluster, + deleteCluster +} diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java new file mode 100644 index 0000000..b59b9e1 --- /dev/null +++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.kafka.AWSKafka; +import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo; +import com.amazonaws.services.kafka.model.CreateClusterRequest; +import com.amazonaws.services.kafka.model.CreateClusterResult; +import com.amazonaws.services.kafka.model.DeleteClusterRequest; +import com.amazonaws.services.kafka.model.DeleteClusterResult; +import com.amazonaws.services.kafka.model.ListClustersRequest; +import com.amazonaws.services.kafka.model.ListClustersResult; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.support.DefaultProducer; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.URISupport; + +/** + * A Producer which sends messages to the Amazon MSK Service + * <a href="http://aws.amazon.com/msk/">AWS MSK</a> + */ +public class MSKProducer extends DefaultProducer { + + private transient String mskProducerToString; + + public MSKProducer(Endpoint endpoint) { + super(endpoint); + } + + public void process(Exchange exchange) throws Exception { + switch (determineOperation(exchange)) { + case listClusters: + listClusters(getEndpoint().getMskClient(), exchange); + break; + case createCluster: + createCluster(getEndpoint().getMskClient(), exchange); + break; + case deleteCluster: + deleteCluster(getEndpoint().getMskClient(), exchange); + break; + default: + throw new IllegalArgumentException("Unsupported operation"); + } + } + + private MSKOperations determineOperation(Exchange exchange) { + MSKOperations operation = exchange.getIn().getHeader(MSKConstants.OPERATION, MSKOperations.class); + if (operation == null) { + operation = getConfiguration().getOperation(); + } + return operation; + } + + protected MSKConfiguration getConfiguration() { + return getEndpoint().getConfiguration(); + } + + @Override + public String toString() { + if (mskProducerToString == null) { + mskProducerToString = "MSKProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; + } + return mskProducerToString; + } + + @Override + public MSKEndpoint getEndpoint() { + return (MSKEndpoint)super.getEndpoint(); + } + + private void listClusters(AWSKafka mskClient, Exchange exchange) { + ListClustersRequest request = new ListClustersRequest(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTERS_FILTER))) { + String filter = exchange.getIn().getHeader(MSKConstants.CLUSTERS_FILTER, String.class); + request.withClusterNameFilter(filter); + } + ListClustersResult result; + try { + result = mskClient.listClusters(request); + } catch (AmazonServiceException ase) { + log.trace("List Clusters command returned the error code {}", ase.getErrorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + + private void createCluster(AWSKafka mskClient, Exchange exchange) { + CreateClusterRequest request = new CreateClusterRequest(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_NAME))) { + String name = exchange.getIn().getHeader(MSKConstants.CLUSTER_NAME, String.class); + request.withClusterName(name); + } else { + throw new IllegalArgumentException("Cluster Name must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_KAFKA_VERSION))) { + String version = exchange.getIn().getHeader(MSKConstants.CLUSTER_KAFKA_VERSION, String.class); + request.withKafkaVersion(version); + } else { + throw new IllegalArgumentException("Kafka Version must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.BROKER_NODES_NUMBER))) { + Integer nodesNumber = exchange.getIn().getHeader(MSKConstants.BROKER_NODES_NUMBER, Integer.class); + request.withNumberOfBrokerNodes(nodesNumber); + } else { + throw new IllegalArgumentException("Kafka Version must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.BROKER_NODES_GROUP_INFO))) { + BrokerNodeGroupInfo brokerNodesGroupInfo = exchange.getIn().getHeader(MSKConstants.BROKER_NODES_GROUP_INFO, BrokerNodeGroupInfo.class); + request.withBrokerNodeGroupInfo(brokerNodesGroupInfo); + } else { + throw new IllegalArgumentException("BrokerNodeGroupInfo must be specified"); + } + CreateClusterResult result; + try { + result = mskClient.createCluster(request); + } catch (AmazonServiceException ase) { + log.trace("Create Cluster command returned the error code {}", ase.getErrorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + + private void deleteCluster(AWSKafka mskClient, Exchange exchange) { + DeleteClusterRequest request = new DeleteClusterRequest(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_ARN))) { + String arn = exchange.getIn().getHeader(MSKConstants.CLUSTER_ARN, String.class); + request.withClusterArn(arn); + } else { + throw new IllegalArgumentException("Cluster ARN must be specified"); + } + DeleteClusterResult result; + try { + result = mskClient.deleteCluster(request); + } catch (AmazonServiceException ase) { + log.trace("Delete Cluster command returned the error code {}", ase.getErrorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + + public static Message getMessageForResponse(final Exchange exchange) { + if (exchange.getPattern().isOutCapable()) { + Message out = exchange.getOut(); + out.copyFrom(exchange.getIn()); + return out; + } + return exchange.getIn(); + } +} \ No newline at end of file diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java new file mode 100644 index 0000000..b70c979 --- /dev/null +++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.services.kafka.AbstractAWSKafka; +import com.amazonaws.services.kafka.model.ClusterInfo; +import com.amazonaws.services.kafka.model.ClusterState; +import com.amazonaws.services.kafka.model.CreateClusterRequest; +import com.amazonaws.services.kafka.model.CreateClusterResult; +import com.amazonaws.services.kafka.model.DeleteClusterRequest; +import com.amazonaws.services.kafka.model.DeleteClusterResult; +import com.amazonaws.services.kafka.model.ListClustersRequest; +import com.amazonaws.services.kafka.model.ListClustersResult; + + +public class AmazonMSKClientMock extends AbstractAWSKafka { + + public AmazonMSKClientMock() { + super(); + } + + @Override + public ListClustersResult listClusters(ListClustersRequest request) { + ListClustersResult result = new ListClustersResult(); + List<ClusterInfo> info = new ArrayList<ClusterInfo>(); + ClusterInfo info1 = new ClusterInfo(); + info1.setClusterName("test-kafka"); + info.add(info1); + result.setClusterInfoList(info); + return result; + } + + @Override + public CreateClusterResult createCluster(CreateClusterRequest request) { + CreateClusterResult result = new CreateClusterResult(); + result.setClusterName(request.getClusterName()); + result.setState(ClusterState.CREATING.name()); + return result; + } + + @Override + public DeleteClusterResult deleteCluster(DeleteClusterRequest request) { + DeleteClusterResult res = new DeleteClusterResult(); + res.setClusterArn(request.getClusterArn()); + res.setState(ClusterState.DELETING.name()); + return res; + } +} diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java new file mode 100644 index 0000000..ec81a5e --- /dev/null +++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import com.amazonaws.regions.Regions; + +import org.apache.camel.component.aws.msk.MSKComponent; +import org.apache.camel.component.aws.msk.MSKEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class MSKComponentConfigurationTest extends CamelTestSupport { + + + @Test + public void createEndpointWithComponentElements() throws Exception { + MSKComponent component = new MSKComponent(context); + component.setAccessKey("XXX"); + component.setSecretKey("YYY"); + MSKEndpoint endpoint = (MSKEndpoint)component.createEndpoint("aws-msk://label"); + + assertEquals("XXX", endpoint.getConfiguration().getAccessKey()); + assertEquals("YYY", endpoint.getConfiguration().getSecretKey()); + } + + @Test + public void createEndpointWithComponentAndEndpointElements() throws Exception { + MSKComponent component = new MSKComponent(context); + component.setAccessKey("XXX"); + component.setSecretKey("YYY"); + component.setRegion(Regions.US_WEST_1.toString()); + MSKEndpoint endpoint = (MSKEndpoint)component.createEndpoint("aws-msk://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1"); + + assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey()); + assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); + assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion()); + } + +} diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java new file mode 100644 index 0000000..cf57117 --- /dev/null +++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.Component; +import org.apache.camel.component.aws.msk.MSKOperations; +import org.apache.camel.component.extension.ComponentVerifierExtension; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Assert; +import org.junit.Test; + +public class MSKComponentVerifierExtensionTest extends CamelTestSupport { + + // ************************************************* + // Tests (parameters) + // ************************************************* + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testParameters() throws Exception { + Component component = context().getComponent("aws-msk"); + + ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new); + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("secretKey", "l"); + parameters.put("accessKey", "k"); + parameters.put("region", "l"); + parameters.put("label", "test"); + parameters.put("operation", MSKOperations.listClusters); + + ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters); + + Assert.assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus()); + } + + @Test + public void testConnectivity() throws Exception { + Component component = context().getComponent("aws-msk"); + ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new); + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("secretKey", "l"); + parameters.put("accessKey", "k"); + parameters.put("region", "US_EAST_1"); + parameters.put("label", "test"); + parameters.put("operation", MSKOperations.listClusters); + + ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters); + + Assert.assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus()); + } + +} diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java new file mode 100644 index 0000000..50a26f2 --- /dev/null +++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.aws.msk.MSKConstants; +import org.apache.camel.component.aws.msk.MSKOperations; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo; +import com.amazonaws.services.kafka.model.ClusterState; +import com.amazonaws.services.kafka.model.CreateClusterResult; +import com.amazonaws.services.kafka.model.DeleteClusterResult; +import com.amazonaws.services.kafka.model.ListClustersResult; + +public class MSKProducerSpringTest extends CamelSpringTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mock; + + @Test + public void kmsListKeysTest() throws Exception { + + mock.expectedMessageCount(1); + Exchange exchange = template.request("direct:listClusters", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.listClusters); + } + }); + + assertMockEndpointsSatisfied(); + + ListClustersResult resultGet = (ListClustersResult) exchange.getIn().getBody(); + assertEquals(1, resultGet.getClusterInfoList().size()); + assertEquals("test-kafka", resultGet.getClusterInfoList().get(0).getClusterName()); + } + + @Test + public void mskCreateClusterTest() throws Exception { + + mock.expectedMessageCount(1); + Exchange exchange = template.request("direct:createCluster", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.createCluster); + exchange.getIn().setHeader(MSKConstants.CLUSTER_NAME, "test-kafka"); + exchange.getIn().setHeader(MSKConstants.CLUSTER_KAFKA_VERSION, "2.1.1"); + exchange.getIn().setHeader(MSKConstants.BROKER_NODES_NUMBER, 2); + BrokerNodeGroupInfo groupInfo = new BrokerNodeGroupInfo(); + exchange.getIn().setHeader(MSKConstants.BROKER_NODES_GROUP_INFO, groupInfo); + } + }); + + assertMockEndpointsSatisfied(); + + CreateClusterResult resultGet = (CreateClusterResult) exchange.getIn().getBody(); + assertEquals("test-kafka", resultGet.getClusterName()); + assertEquals(ClusterState.CREATING.name(), resultGet.getState()); + } + + @Test + public void mskDeleteClusterTest() throws Exception { + + mock.expectedMessageCount(1); + Exchange exchange = template.request("direct:deleteCluster", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.deleteCluster); + exchange.getIn().setHeader(MSKConstants.CLUSTER_ARN, "test-kafka"); + } + }); + + assertMockEndpointsSatisfied(); + + DeleteClusterResult resultGet = (DeleteClusterResult) exchange.getIn().getBody(); + assertEquals("test-kafka", resultGet.getClusterArn()); + assertEquals(ClusterState.DELETING.name(), resultGet.getState()); + } + + @Override + protected ClassPathXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml"); + } +} \ No newline at end of file diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java new file mode 100644 index 0000000..23d3ada --- /dev/null +++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.msk; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws.msk.MSKConstants; +import org.apache.camel.component.aws.msk.MSKOperations; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo; +import com.amazonaws.services.kafka.model.ClusterState; +import com.amazonaws.services.kafka.model.CreateClusterResult; +import com.amazonaws.services.kafka.model.DeleteClusterResult; +import com.amazonaws.services.kafka.model.ListClustersResult; + +public class MSKProducerTest extends CamelTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint mock; + + @Test + public void kmsListClustersTest() throws Exception { + + mock.expectedMessageCount(1); + Exchange exchange = template.request("direct:listClusters", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.listClusters); + } + }); + + assertMockEndpointsSatisfied(); + + ListClustersResult resultGet = (ListClustersResult) exchange.getIn().getBody(); + assertEquals(1, resultGet.getClusterInfoList().size()); + assertEquals("test-kafka", resultGet.getClusterInfoList().get(0).getClusterName()); + } + + @Test + public void mskCreateClusterTest() throws Exception { + + mock.expectedMessageCount(1); + Exchange exchange = template.request("direct:createCluster", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.createCluster); + exchange.getIn().setHeader(MSKConstants.CLUSTER_NAME, "test-kafka"); + exchange.getIn().setHeader(MSKConstants.CLUSTER_KAFKA_VERSION, "2.1.1"); + exchange.getIn().setHeader(MSKConstants.BROKER_NODES_NUMBER, 2); + BrokerNodeGroupInfo groupInfo = new BrokerNodeGroupInfo(); + exchange.getIn().setHeader(MSKConstants.BROKER_NODES_GROUP_INFO, groupInfo); + } + }); + + assertMockEndpointsSatisfied(); + + CreateClusterResult resultGet = (CreateClusterResult) exchange.getIn().getBody(); + assertEquals("test-kafka", resultGet.getClusterName()); + assertEquals(ClusterState.CREATING.name(), resultGet.getState()); + } + + @Test + public void mskDeleteClusterTest() throws Exception { + + mock.expectedMessageCount(1); + Exchange exchange = template.request("direct:deleteCluster", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.deleteCluster); + exchange.getIn().setHeader(MSKConstants.CLUSTER_ARN, "test-kafka"); + } + }); + + assertMockEndpointsSatisfied(); + + DeleteClusterResult resultGet = (DeleteClusterResult) exchange.getIn().getBody(); + assertEquals("test-kafka", resultGet.getClusterArn()); + assertEquals(ClusterState.DELETING.name(), resultGet.getState()); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + AmazonMSKClientMock clientMock = new AmazonMSKClientMock(); + + registry.bind("amazonMskClient", clientMock); + + return registry; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:listClusters") + .to("aws-msk://test?mskClient=#amazonMskClient&operation=listClusters") + .to("mock:result"); + from("direct:createCluster") + .to("aws-msk://test?mskClient=#amazonMskClient&operation=createCluster") + .to("mock:result"); + from("direct:deleteCluster") + .to("aws-msk://test?mskClient=#amazonMskClient&operation=deleteCluster") + .to("mock:result"); + } + }; + } +} \ No newline at end of file diff --git a/components/camel-aws-msk/src/test/resources/log4j2.properties b/components/camel-aws-msk/src/test/resources/log4j2.properties new file mode 100644 index 0000000..986f470 --- /dev/null +++ b/components/camel-aws-msk/src/test/resources/log4j2.properties @@ -0,0 +1,28 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-aws-kms-test.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file diff --git a/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml b/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml new file mode 100644 index 0000000..af87975 --- /dev/null +++ b/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:listClusters"/> + <to uri="aws-msk://Test?mskClient=#amazonMskClient&operation=listClusters"/> + <to uri="mock:result"/> + </route> + + <route> + <from uri="direct:createCluster"/> + <to uri="aws-msk://Test?mskClient=#amazonMskClient&operation=createCluster"/> + <to uri="mock:result"/> + </route> + + <route> + <from uri="direct:deleteCluster"/> + <to uri="aws-msk://Test?mskClient=#amazonMskClient&operation=deleteCluster"/> + <to uri="mock:result"/> + </route> + </camelContext> + + <bean id="amazonMskClient" class="org.apache.camel.component.aws.msk.AmazonMSKClientMock"/> +</beans> \ No newline at end of file diff --git a/platforms/spring-boot/components-starter/pom.xml b/platforms/spring-boot/components-starter/pom.xml index 2bc538c..a1dde11 100644 --- a/platforms/spring-boot/components-starter/pom.xml +++ b/platforms/spring-boot/components-starter/pom.xml @@ -124,6 +124,7 @@ <module>camel-aws-kms-starter</module> <module>camel-aws-lambda-starter</module> <module>camel-aws-mq-starter</module> + <module>camel-aws-msk-starter</module> <module>camel-aws-s3-starter</module> <module>camel-aws-sdb-starter</module> <module>camel-aws-ses-starter</module>