CAMEL-10178: Google PubSub Component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/07f57c33 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/07f57c33 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/07f57c33 Branch: refs/heads/master Commit: 07f57c33962cd4a3da8a43f0739d954b0de3b884 Parents: c2ffa95 Author: Evgeny Minkevich <evgeny.minkev...@gmail.com> Authored: Sat Nov 12 14:44:47 2016 +1100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Nov 14 08:23:42 2016 +0100 ---------------------------------------------------------------------- apache-camel/pom.xml | 9 + .../src/main/descriptors/common-bin.xml | 4 +- .../camel-google-pubsub-starter/pom.xml | 59 ++++++ .../GooglePubsubComponentAutoConfiguration.java | 74 +++++++ .../GooglePubsubComponentConfiguration.java | 48 +++++ .../src/main/resources/META-INF/LICENSE.txt | 203 ++++++++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../main/resources/META-INF/spring.factories | 19 ++ .../src/main/resources/META-INF/spring.provides | 18 ++ components-starter/pom.xml | 1 + components/camel-google-pubsub/.gitignore | 7 + components/camel-google-pubsub/ReadMe.md | 36 ++++ components/camel-google-pubsub/pom.xml | 194 +++++++++++++++++ .../src/main/docs/google-pubsub-component.adoc | 148 +++++++++++++ .../google/pubsub/GooglePubsubComponent.java | 79 +++++++ .../pubsub/GooglePubsubConnectionFactory.java | 209 +++++++++++++++++++ .../google/pubsub/GooglePubsubConstants.java | 34 +++ .../google/pubsub/GooglePubsubConsumer.java | 171 +++++++++++++++ .../google/pubsub/GooglePubsubEndpoint.java | 187 +++++++++++++++++ .../google/pubsub/GooglePubsubProducer.java | 151 ++++++++++++++ .../pubsub/consumer/ExchangeAckTransaction.java | 61 ++++++ .../pubsub/consumer/PubsubAcknowledgement.java | 83 ++++++++ .../src/main/resources/META-INF/LICENSE.txt | 203 ++++++++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../org/apache/camel/component/google-pubsub | 18 ++ .../google/pubsub/PubsubTestSupport.java | 140 +++++++++++++ .../pubsub/integration/AckModeNoneTest.java | 94 +++++++++ .../pubsub/integration/AcknowledgementTest.java | 136 ++++++++++++ .../pubsub/integration/BodyTypesTest.java | 171 +++++++++++++++ .../GroupedExchangeRoundtripTest.java | 119 +++++++++++ .../PubsubConnectionFactoryTest.java | 65 ++++++ .../SingleExchangeRoundtripTest.java | 132 ++++++++++++ .../google/pubsub/unit/PubsubComponentTest.java | 43 ++++ .../google/pubsub/unit/PubsubEndpointTest.java | 76 +++++++ .../google/pubsub/unit/PubsubProducerTest.java | 58 +++++ .../test/resources/camel-pubsub-component.json | 12 ++ .../src/test/resources/log4j.properties | 37 ++++ .../src/test/resources/logging.properties | 4 + .../src/test/resources/simple.properties | 9 + components/pom.xml | 1 + components/readme.adoc | 3 + docs/user-manual/en/SUMMARY.md | 1 + parent/pom.xml | 11 + .../features/src/main/resources/features.xml | 31 ++- .../camel-spring-boot-dependencies/pom.xml | 10 + .../itest/karaf/CamelGooglePubsubTest.java | 33 +++ 46 files changed, 3215 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/apache-camel/pom.xml ---------------------------------------------------------------------- diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml index aa42c84..f404cab 100644 --- a/apache-camel/pom.xml +++ b/apache-camel/pom.xml @@ -319,6 +319,10 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-google-pubsub</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-grape</artifactId> </dependency> <dependency> @@ -1321,6 +1325,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-google-pubsub-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-gora-starter</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/apache-camel/src/main/descriptors/common-bin.xml ---------------------------------------------------------------------- diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml index 8528374..ece1c26 100644 --- a/apache-camel/src/main/descriptors/common-bin.xml +++ b/apache-camel/src/main/descriptors/common-bin.xml @@ -91,6 +91,7 @@ <include>org.apache.camel:camel-google-calendar</include> <include>org.apache.camel:camel-google-drive</include> <include>org.apache.camel:camel-google-mail</include> + <include>org.apache.camel:camel-google-pubsub</include> <include>org.apache.camel:camel-gora</include> <include>org.apache.camel:camel-groovy</include> <include>org.apache.camel:camel-guava-eventbus</include> @@ -234,7 +235,7 @@ <include>org.apache.camel:camel-stringtemplate</include> <include>org.apache.camel:camel-syslog</include> <include>org.apache.camel:camel-swagger</include> - <include>org.apache.camel:camel-swagger-java</include> + <include>org.apache.camel:camel-swagger-java</include> <include>org.apache.camel:camel-tagsoup</include> <include>org.apache.camel:camel-tarfile</include> <include>org.apache.camel:camel-telegram</include> @@ -340,6 +341,7 @@ <include>org.apache.camel:camel-google-calendar-starter</include> <include>org.apache.camel:camel-google-drive-starter</include> <include>org.apache.camel:camel-google-mail-starter</include> + <include>org.apache.camel:camel-google-pubsub-starter</include> <include>org.apache.camel:camel-gora-starter</include> <include>org.apache.camel:camel-grape-starter</include> <include>org.apache.camel:camel-groovy-starter</include> http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/pom.xml ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/pom.xml b/components-starter/camel-google-pubsub-starter/pom.xml new file mode 100644 index 0000000..ef0847e --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components-starter</artifactId> + <version>2.19.0-SNAPSHOT</version> + </parent> + <artifactId>camel-google-pubsub-starter</artifactId> + <packaging>jar</packaging> + <name>Spring-Boot Starter :: Camel :: GooglePubsub</name> + <description>Spring-Boot Starter for Camel Component for Google Cloud Platform Pubsub</description> + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter</artifactId> + <version>${spring-boot-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-google-pubsub</artifactId> + <version>${project.version}</version> + <!--START OF GENERATED CODE--> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + <!--END OF GENERATED CODE--> + </dependency> + <!--START OF GENERATED CODE--> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core-starter</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-spring-boot-starter</artifactId> + </dependency> + <!--END OF GENERATED CODE--> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentAutoConfiguration.java b/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentAutoConfiguration.java new file mode 100644 index 0000000..7bfbb6e --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentAutoConfiguration.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.springboot; + +import java.util.HashMap; +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.component.google.pubsub.GooglePubsubComponent; +import org.apache.camel.util.IntrospectionSupport; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Generated by camel-package-maven-plugin - do not edit this file! + */ +@Configuration +@ConditionalOnBean(type = "org.apache.camel.springboot.CamelAutoConfiguration") +@EnableConfigurationProperties(GooglePubsubComponentConfiguration.class) +public class GooglePubsubComponentAutoConfiguration { + + @Bean(name = "google-pubsub-component") + @ConditionalOnClass(CamelContext.class) + @ConditionalOnMissingBean(GooglePubsubComponent.class) + public GooglePubsubComponent configureGooglePubsubComponent( + CamelContext camelContext, + GooglePubsubComponentConfiguration configuration) throws Exception { + GooglePubsubComponent component = new GooglePubsubComponent(); + component.setCamelContext(camelContext); + Map<String, Object> parameters = new HashMap<>(); + IntrospectionSupport.getProperties(configuration, parameters, null, + false); + for (Map.Entry<String, Object> entry : parameters.entrySet()) { + Object value = entry.getValue(); + Class<?> paramClass = value.getClass(); + if (paramClass.getName().endsWith("NestedConfiguration")) { + Class nestedClass = null; + try { + nestedClass = (Class) paramClass.getDeclaredField( + "CAMEL_NESTED_CLASS").get(null); + HashMap<String, Object> nestedParameters = new HashMap<>(); + IntrospectionSupport.getProperties(value, nestedParameters, + null, false); + Object nestedProperty = nestedClass.newInstance(); + IntrospectionSupport.setProperties(camelContext, + camelContext.getTypeConverter(), nestedProperty, + nestedParameters); + entry.setValue(nestedProperty); + } catch (NoSuchFieldException e) { + } + } + } + IntrospectionSupport.setProperties(camelContext, + camelContext.getTypeConverter(), component, parameters); + return component; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentConfiguration.java b/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentConfiguration.java new file mode 100644 index 0000000..e60d01e --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/src/main/java/org/apache/camel/component/google/pubsub/springboot/GooglePubsubComponentConfiguration.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.springboot; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * PubSub Endpoint Definition + * + * Generated by camel-package-maven-plugin - do not edit this file! + */ +@ConfigurationProperties(prefix = "camel.component.google-pubsub") +public class GooglePubsubComponentConfiguration { + + /** + * Sets the connection factory to use: provides the ability to explicitly + * manage connection credentials: - the path to the key file - the Service + * Account Key / Email pair + */ + private GooglePubsubConnectionFactoryNestedConfiguration connectionFactory; + + public GooglePubsubConnectionFactoryNestedConfiguration getConnectionFactory() { + return connectionFactory; + } + + public void setConnectionFactory( + GooglePubsubConnectionFactoryNestedConfiguration connectionFactory) { + this.connectionFactory = connectionFactory; + } + + public static class GooglePubsubConnectionFactoryNestedConfiguration { + public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/LICENSE.txt ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/LICENSE.txt b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/LICENSE.txt new file mode 100644 index 0000000..6b0b127 --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/NOTICE.txt ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/NOTICE.txt b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/NOTICE.txt new file mode 100644 index 0000000..2e215bf --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/NOTICE.txt @@ -0,0 +1,11 @@ + ========================================================================= + == NOTICE file corresponding to the section 4 d of == + == the Apache License, Version 2.0, == + == in this case for the Apache Camel distribution. == + ========================================================================= + + This product includes software developed by + The Apache Software Foundation (http://www.apache.org/). + + Please read the different LICENSE files present in the licenses directory of + this distribution. http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.factories ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.factories b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..709d47e --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.apache.camel.component.google.pubsub.springboot.GooglePubsubComponentAutoConfiguration http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.provides ---------------------------------------------------------------------- diff --git a/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.provides b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.provides new file mode 100644 index 0000000..16766c4 --- /dev/null +++ b/components-starter/camel-google-pubsub-starter/src/main/resources/META-INF/spring.provides @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +provides: camel-google-pubsub \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components-starter/pom.xml ---------------------------------------------------------------------- diff --git a/components-starter/pom.xml b/components-starter/pom.xml index aaed1da..5dc6005 100644 --- a/components-starter/pom.xml +++ b/components-starter/pom.xml @@ -137,6 +137,7 @@ <module>camel-google-calendar-starter</module> <module>camel-google-drive-starter</module> <module>camel-google-mail-starter</module> + <module>camel-google-pubsub-starter</module> <module>camel-gora-starter</module> <module>camel-grape-starter</module> <module>camel-groovy-starter</module> http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/.gitignore ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/.gitignore b/components/camel-google-pubsub/.gitignore new file mode 100644 index 0000000..d10190d --- /dev/null +++ b/components/camel-google-pubsub/.gitignore @@ -0,0 +1,7 @@ +.idea +*.iml +.settings +.project +.classpath +target +deploy*.cmd http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/ReadMe.md ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/ReadMe.md b/components/camel-google-pubsub/ReadMe.md new file mode 100644 index 0000000..abb6ffa --- /dev/null +++ b/components/camel-google-pubsub/ReadMe.md @@ -0,0 +1,36 @@ +## Camel Google PubSub Component testing + +The unit tests provided are somewhat limited. +Due to the nature of the component, it needs to be tested against a server. To assist with this task Google has provided +a PubSub Emulator. The test for the component, therefore, have been split into two groups : + +* Unit : <br> + Standalone tests that can be conducted on their own +* Integration : <br> + Tests against the emulator + +### Emulator local installation + +Emulator is being distributed with the [Google SDK](https://cloud.google.com/sdk/). +Once the SDK has been installed and configured, add PubSub Emulator: + +``` +gcloud components install pubsub-emulator +``` + +Please note the folder where it is installed and configure _GCLOUD_SDK_PATH_ environmental variable. +It is a custom variable, used by mvn plugin to find the installation. + + +### Execution + +Maven is configured to start the emulator prior the integration tests and shut down afterwards. +The emulator is configured to listen to port 8383. + +Integration tests and the emulator will eb available as part of _google-pubsub-test_ profile: + +``` +mvn -Pgoogle-pubsub-test verify +``` + + http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/pom.xml b/components/camel-google-pubsub/pom.xml new file mode 100644 index 0000000..2dc609e --- /dev/null +++ b/components/camel-google-pubsub/pom.xml @@ -0,0 +1,194 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.19.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-google-pubsub</artifactId> + <packaging>jar</packaging> + <name>Camel :: GooglePubsub</name> + <description>Camel Component for Google Cloud Platform Pubsub</description> + + <properties> + <schemeName>google-pubsub</schemeName> + <componentName>GooglePubSub</componentName> + <componentPackage>org.apache.camel.component.google.pubsub</componentPackage> + <camel.osgi.export.pkg>org.apache.camel.component.google.pubsub</camel.osgi.export.pkg> + <camel.osgi.export.service> + org.apache.camel.spi.ComponentResolver;component=google-pubsub + </camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-pubsub</artifactId> + <version>${google-api-services-pubsub-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <includes> + <include>**/unit/*Test.java</include> + </includes> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <configuration> + <encoding>UTF-8</encoding> + </configuration> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.camel</groupId> + <artifactId>camel-api-component-maven-plugin</artifactId> + <version>${project.version}</version> + <configuration> + <scheme>${schemeName}</scheme> + <componentName>${componentName}</componentName> + <componentPackage>${componentPackage}</componentPackage> + <outPackage>${outPackage}</outPackage> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + </build> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.camel</groupId> + <artifactId>camel-api-component-maven-plugin</artifactId> + <version>${project.version}</version> + <configuration> + <scheme>${schemeName}</scheme> + <componentName>${componentName}</componentName> + <componentPackage>${componentPackage}</componentPackage> + <outPackage>${outPackage}</outPackage> + </configuration> + </plugin> + </plugins> + </reporting> + + <profiles> + <profile> + <id>google-pubsub-test</id> + <build> + <plugins> + + <!-- + PUBSUB EMULATOR + depends on Google SDK being installed and available in $PATH + https://cloud.google.com/sdk/downloads + --> + <plugin> + <groupId>com.bazaarvoice.maven.plugins</groupId> + <artifactId>process-exec-maven-plugin</artifactId> + <version>0.7</version> + <executions> + + <!-- Start : compile, before tests --> + <execution> + <id>pubsub-emulator</id> + <phase>test-compile</phase> + <goals> + <goal>start</goal> + </goals> + <configuration> + <name>Google PubSub Emulator</name> + <waitForInterrupt>false</waitForInterrupt> + <healthcheckUrl>http://localhost:8383</healthcheckUrl> + <workingDir>/tmp</workingDir> + <arguments> + <argument>java</argument> + <argument> + -Djava.util.logging.config.file=${project.basedir}/src/test/resources/logging.properties + </argument> + <argument>-jar</argument> + <argument> + ${env.GCLOUD_SDK_PATH}/platform/pubsub-emulator/lib/cloud-pubsub-emulator-0.1-SNAPSHOT-all.jar + </argument> + <argument>--port=8383</argument> + </arguments> + </configuration> + </execution> + + <!-- Stop : package, after test --> + <execution> + <id>stop-all</id> + <phase>prepare-package</phase> + <goals> + <goal>stop-all</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <childDelegation>false</childDelegation> + <useFile>true</useFile> + <forkCount>1</forkCount> + <reuseForks>true</reuseForks> + <forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc new file mode 100644 index 0000000..f26c2b3 --- /dev/null +++ b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc @@ -0,0 +1,148 @@ +[[GooglePubsub-GooglePubsubComponent]] +GooglePubsub Component +~~~~~~~~~~~~~~~~~~ + +*Available as of Camel 2.19* + +[[GooglePubsub-ComponentDescription]] +Component Description +^^^^^^^^^^^^^^^^^^^^^ + +The Google Pubsub component provides access +to https://cloud.google.com/pubsub/[Cloud Pub/Sub Infrastructure]Â via +the https://cloud.google.com/apis/docs/client-libraries-explained[Google Client Services API]. + +The current implementation does not use gRPC. + +Maven users will need to add the following dependency to their pom.xml +for this component: + +------------------------------------------------------ + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-google-pubsub</artifactId> + <version>2.19-SNAPSHOT</version> + </dependency> + +------------------------------------------------------ + +[[GooglePubsub-AuthenticationConfiguration]] +Authentication Configuration + +Google Pubsub component authentication is targeted for use with the GCP Service Accounts. +For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] + +Google security credentials can be set explicitly via one of the two options: + +* Service Account Email and Service Account Key (PEM format) +* GCP credentials file location + +If both are set, the Service Account Email/Key will take precedence. + +Or implicitly, where the connection factory falls back on +https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. + +*OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. + +Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. + +[[GooglePubsub-URIFormat]] +URI Format +^^^^^^^^^ + +The GoogleMail Component uses the following URI format: + +-------------------------------------------------------- + google-pubsub://project-id:destinationName?[options] +-------------------------------------------------------- + +Destination Name can be either a topic or a subscription name. + +[[GooglePubsub-GooglePubsubComponent]] +GooglePubsubComponent +^^^^^^^^^^^^^^^^^^ + +// component options: START +The Google Pubsub component supports 1 options which are listed below. + + + +{% raw %} +[width="100%",cols="2,1m,7",options="header"] +|======================================================================= +| Name | Java Type | Description +| connectionFactory | GooglePubsubConnectionFactory | Sets the connection factory to use: provides the ability to explicitly manage connection credentials: - the path to the key file - the Service Account Key / Email pair +|======================================================================= +{% endraw %} +// component options: END + +// endpoint options: START +The Google Pubsub component supports 11 endpoint options which are listed below: + +{% raw %} +[width="100%",cols="2,1,1m,1m,5",options="header"] +|======================================================================= +| Name | Group | Default | Java Type | Description +| projectId | common | | String | *Required* Project Id +| destinationName | common | | String | *Required* Destination Name +| ackMode | common | AUTO | AckMode | AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly +| concurrentConsumers | common | | Integer | The number of parallel streams consuming from the subscription +| connectionFactory | common | | GooglePubsubConnectionFactory | ConnectionFactory to obtain connection to PubSub Service. If non provided the default one will be used +| loggerId | common | | String | Logger ID to use when a match to the parent route required +| maxMessagesPerPoll | common | | Integer | The max number of messages to receive from the server in a single API call +| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. +| exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. +| exchangePattern | consumer (advanced) | | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange. +| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). +|======================================================================= +{% endraw %} +// endpoint options: END + +[[GooglePubsub-ProducerEndpoints]] +Producer Endpoints +^^^^^^^^^^^^^^^^^^ + +Producer endpoints can accept and deliver to PubSub individual and grouped +exchanges alike. Grouped exchanges have `Exchange.GROUPED_EXCHANGE` property set. + +Google PubSub expects the payload to be byte[] array, Producer endpoints will send: + +* String body as byte[] encoded as UTF-8 +* byte[] body as is +* Everything else will be serialised into byte[] array + +A Map set as message header `GooglePubsubConstants.ATTRIBUTES` will be sent as PubSub attributes. +Once exchange has been delivered to PubSub the PubSub Message ID will be assigned to +the header `GooglePubsubConstants.MESSAGE_ID`. + +[[GooglePubsub-ConsumerEndpoints]] +Consumer Endpoints +^^^^^^^^^^^^^^^^^^ +Google PubSub will redeliver the message if it has not been acknowledged within the time period set +as a configuration option on the subscription. + +The component will acknowledge the message once exchange processing has been completed. + +If the route throws an exception, the exchange is marked as failed and the component will NACK the message - +it will be redelivered immediately. + +To ack/nack the message the component uses Acknowledgement ID stored as header `GooglePubsubConstants.ACK_ID`. +If the header is removed or tampered with, the ack will fail and the message will be redelivered +again after the ack deadline. + +[[GooglePubsub-MessageHeaders]] +Message Headers +^^^^^^^^^^^^ +Headers set by the consumer endpoints: + +* GooglePubsubConstants.MESSAGE_ID +* GooglePubsubConstants.ATTRIBUTES +* GooglePubsubConstants.PUBLISH_TIME +* GooglePubsubConstants.ACK_ID + +[[GooglePubsub-MessageBody]] +Message Body +^^^^^^^^^^^^ + +The consumer endpoint returns the content of the message as byte[] - exactly as the underlying system sends it. +It is up for the route to convert/unmarshall the contents. http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java new file mode 100644 index 0000000..a79c67e --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Represents the component that manages {@link GooglePubsubEndpoint}. + */ +public class GooglePubsubComponent extends UriEndpointComponent { + + private GooglePubsubConnectionFactory connectionFactory; + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + public GooglePubsubComponent() { + super(GooglePubsubEndpoint.class); + } + + public GooglePubsubComponent(CamelContext context) { + super(context, GooglePubsubEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + + String[] parts = remaining.split(":"); + + if (parts.length < 2) { + throw new IllegalArgumentException("Google PubSub Endpoint format \"projectId:destinationName[:subscriptionName]\""); + } + + GooglePubsubEndpoint pubsubEndpoint = new GooglePubsubEndpoint(uri, this, remaining); + pubsubEndpoint.setProjectId(parts[0]); + pubsubEndpoint.setDestinationName(parts[1]); + + setProperties(pubsubEndpoint, parameters); + + return pubsubEndpoint; + } + + /** + * Sets the connection factory to use: + * provides the ability to explicitly manage connection credentials: + * - the path to the key file + * - the Service Account Key / Email pair + */ + public GooglePubsubConnectionFactory getConnectionFactory() { + if (connectionFactory == null) { + connectionFactory = new GooglePubsubConnectionFactory(); + } + return connectionFactory; + } + + public void setConnectionFactory(GooglePubsubConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java new file mode 100644 index 0000000..104bd63 --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import java.io.FileInputStream; +import java.security.KeyFactory; +import java.security.PrivateKey; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Collection; +import java.util.Collections; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.util.Base64; +import com.google.api.client.util.Strings; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.PubsubScopes; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GooglePubsubConnectionFactory { + + private static JsonFactory jsonFactory; + private static HttpTransport transport; + + private final Logger logger = LoggerFactory.getLogger(GooglePubsubConnectionFactory.class); + + private String serviceAccount; + private String serviceAccountKey; + private String credentialsFileLocation; + private String serviceURL; + + private Pubsub client; + + public GooglePubsubConnectionFactory() { + jsonFactory = new JacksonFactory(); + + try { + transport = GoogleNetHttpTransport.newTrustedTransport(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public synchronized Pubsub getClient() throws Exception { + if (this.client == null) { + this.client = buildClient(); + } + return this.client; + } + + private Pubsub buildClient() throws Exception { + + GoogleCredential credential = null; + + if (!Strings.isNullOrEmpty(serviceAccount) && !Strings.isNullOrEmpty(serviceAccountKey)) { + if (logger.isDebugEnabled()) { + logger.debug("Service Account and Key have been set explicitly. Initialising PubSub using Service Account " + serviceAccount); + } + credential = createFromAccountKeyPair(); + } + + if (credential == null && !Strings.isNullOrEmpty(credentialsFileLocation)) { + if (logger.isDebugEnabled()) { + logger.debug("Key File Name has been set explicitly. Initialising PubSub using Key File " + credentialsFileLocation); + } + credential = createFromFile(); + } + + if (credential == null) { + if (logger.isDebugEnabled()) { + logger.debug("No explicit Service Account or Key File Name have been provided. Initialising PubSub using defaults "); + } + credential = createDefault(); + } + + Pubsub.Builder builder = new Pubsub.Builder(transport, jsonFactory, credential) + .setApplicationName("camel-google-pubsub"); + + // Local emulator, SOCKS proxy, etc. + if (serviceURL != null) { + builder.setRootUrl(serviceURL); + } + + return builder.build(); + } + + private GoogleCredential createFromFile() throws Exception { + + GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream(credentialsFileLocation)); + + if (credential.createScopedRequired()) { + credential = credential.createScoped(PubsubScopes.all()); + } + + return credential; + } + + private GoogleCredential createDefault() throws Exception { + GoogleCredential credential = GoogleCredential.getApplicationDefault(); + + Collection pubSubScopes = Collections.singletonList(PubsubScopes.PUBSUB); + + if (credential.createScopedRequired()) { + credential = credential.createScoped(pubSubScopes); + } + + return credential; + } + + private GoogleCredential createFromAccountKeyPair() { + try { + GoogleCredential credential = new GoogleCredential.Builder() + .setTransport(transport) + .setJsonFactory(jsonFactory) + .setServiceAccountId(serviceAccount) + .setServiceAccountScopes(PubsubScopes.all()) + .setServiceAccountPrivateKey(getPrivateKeyFromString(serviceAccountKey)) + .build(); + + return credential; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private PrivateKey getPrivateKeyFromString(String serviceKeyPem) { + PrivateKey privateKey = null; + try { + String privKeyPEM = serviceKeyPem.replace("-----BEGIN PRIVATE KEY-----", "") + .replace("-----END PRIVATE KEY-----", "") + .replace("\r", "") + .replace("\n", ""); + + byte[] encoded = Base64.decodeBase64(privKeyPEM); + + PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded); + privateKey = KeyFactory.getInstance("RSA") + .generatePrivate(keySpec); + } catch (Exception e) { + String error = "Constructing Private Key from PEM string failed: " + e.getMessage(); + logger.error(error, e); + throw new RuntimeException(e); + } + return privateKey; + } + + public String getServiceAccount() { + return serviceAccount; + } + + public GooglePubsubConnectionFactory setServiceAccount(String serviceAccount) { + this.serviceAccount = serviceAccount; + resetClient(); + return this; + } + + public String getServiceAccountKey() { + return serviceAccountKey; + } + + public GooglePubsubConnectionFactory setServiceAccountKey(String serviceAccountKey) { + this.serviceAccountKey = serviceAccountKey; + resetClient(); + return this; + } + + public String getCredentialsFileLocation() { + return credentialsFileLocation; + } + + public GooglePubsubConnectionFactory setCredentialsFileLocation(String credentialsFileLocation) { + this.credentialsFileLocation = credentialsFileLocation; + resetClient(); + return this; + } + + public String getServiceURL() { + return serviceURL; + } + + public GooglePubsubConnectionFactory setServiceURL(String serviceURL) { + this.serviceURL = serviceURL; + resetClient(); + return this; + } + + private synchronized void resetClient() { + this.client = null; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java new file mode 100644 index 0000000..669a521 --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +public final class GooglePubsubConstants { + + public static final String MESSAGE_ID = "CamelGooglePubsub.MessageId"; + public static final String ACK_ID = "CamelGooglePubsub.MsgAckId"; + public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime"; + public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes"; + + public enum AckMode { + AUTO, NONE + } + + private GooglePubsubConstants() { + //not called + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java new file mode 100644 index 0000000..6e2ff5c --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import java.net.SocketTimeoutException; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import com.google.api.client.repackaged.com.google.common.base.Strings; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.google.pubsub.consumer.ExchangeAckTransaction; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.spi.Synchronization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; + +class GooglePubsubConsumer extends DefaultConsumer { + + private Logger localLog; + + private ExecutorService executor; + private final GooglePubsubEndpoint endpoint; + private final Processor processor; + private final Synchronization ackStrategy; + + GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.endpoint = endpoint; + this.processor = processor; + this.ackStrategy = new ExchangeAckTransaction(this.endpoint); + + String loggerId = endpoint.getLoggerId(); + + if (Strings.isNullOrEmpty(loggerId)) { + loggerId = this.getClass().getName(); + } + + localLog = LoggerFactory.getLogger(loggerId); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + localLog.info("Starting Google PubSub consumer"); + executor = endpoint.createExecutor(); + for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) { + + executor.submit(new PubsubPoller(i + "")); + + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + localLog.info("Stopping Google PubSub consumer"); + + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; + } + + private class PubsubPoller implements Runnable { + + private final String subscriptionFullName; + private final String threadId; + + PubsubPoller(String id) { + this.subscriptionFullName = String.format("projects/%s/subscriptions/%s", + GooglePubsubConsumer.this.endpoint.getProjectId(), + GooglePubsubConsumer.this.endpoint.getDestinationName()); + this.threadId = GooglePubsubConsumer.this.endpoint.getDestinationName() + "-" + "Thread " + id; + } + + @Override + @SuppressWarnings("unchecked") + public void run() { + try { + if (localLog.isDebugEnabled()) { + localLog.debug("Subscribing {} to {}", threadId, subscriptionFullName); + } + + while (isRunAllowed() && !isSuspendingOrSuspended()) { + PullRequest pullRequest = new PullRequest().setMaxMessages(endpoint.getMaxMessagesPerPoll()); + PullResponse pullResponse; + try { + if (localLog.isTraceEnabled()) { + localLog.trace("Polling : {}", threadId); + } + pullResponse = GooglePubsubConsumer.this.endpoint + .getPubsub() + .projects() + .subscriptions() + .pull(subscriptionFullName, pullRequest) + .execute(); + } catch (SocketTimeoutException ste) { + if (localLog.isTraceEnabled()) { + localLog.trace("Socket timeout : {}", threadId); + } + continue; + } + + List<ReceivedMessage> receivedMessages = pullResponse.getReceivedMessages(); + + for (ReceivedMessage receivedMessage : receivedMessages) { + PubsubMessage pubsubMessage = receivedMessage.getMessage(); + + byte[] body = pubsubMessage.decodeData(); + + if (localLog.isTraceEnabled()) { + localLog.trace("Received message ID : {}", pubsubMessage.getMessageId()); + } + + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(body); + + exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, receivedMessage.getAckId()); + exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, pubsubMessage.getMessageId()); + exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, pubsubMessage.getPublishTime()); + + if (null != receivedMessage.getMessage().getAttributes()) { + exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, receivedMessage.getMessage().getAttributes()); + } + + if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { + exchange.addOnCompletion(GooglePubsubConsumer.this.ackStrategy); + } + + try { + processor.process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + } + } + } catch (Exception e) { + localLog.error("Requesting messages from PubSub Failed:", e); + RuntimeCamelException rce = wrapRuntimeCamelException(e); + throw rce; + } + } + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java new file mode 100644 index 0000000..7336704 --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import java.util.concurrent.ExecutorService; + +import com.google.api.client.repackaged.com.google.common.base.Strings; +import com.google.api.services.pubsub.Pubsub; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + /** + * Messaging client for Google Cloud Platform PubSub Service: + * https://cloud.google.com/pubsub/ + * + * Built on top of the Service API libraries (v1). + */ +@UriEndpoint(scheme = "google-pubsub", title = "Google Pubsub", + syntax = "google-pubsub:projectId:destinationName", label = "messaging") +public class GooglePubsubEndpoint extends DefaultEndpoint { + + private Logger log; + + @UriPath(description = "Project Id") + @Metadata(required = "true") + private String projectId; + + @UriPath(description = "Destination Name") + @Metadata(required = "true") + private String destinationName; + + @UriParam(name = "loggerId", description = "Logger ID to use when a match to the parent route required") + private String loggerId; + + @UriParam(name = "concurrentConsumers", description = "The number of parallel streams consuming from the subscription", defaultValue=1) + private Integer concurrentConsumers = 1; + + @UriParam(name = "maxMessagesPerPoll", description = "The max number of messages to receive from the server in a single API call", defaultValue=1) + private Integer maxMessagesPerPoll = 1; + + @UriParam(name = "connectionFactory", description = "ConnectionFactory to obtain connection to PubSub Service. If non provided the default one will be used") + private GooglePubsubConnectionFactory connectionFactory; + + @UriParam(defaultValue = "AUTO", enums = "AUTO,NONE", + description = "AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly") + private GooglePubsubConstants.AckMode ackMode = GooglePubsubConstants.AckMode.AUTO; + + private Pubsub pubsub; + + public GooglePubsubEndpoint(String uri, Component component, String remaining) { + super(uri, component); + + if (!(component instanceof GooglePubsubComponent)) { + throw new IllegalArgumentException("The component provided is not GooglePubsubComponent : " + component.getClass().getName()); + } + } + + @Override + public GooglePubsubComponent getComponent() { + return (GooglePubsubComponent) super.getComponent(); + } + + public void afterPropertiesSet() throws Exception { + if (Strings.isNullOrEmpty(loggerId)) { + log = LoggerFactory.getLogger(this.getClass().getName()); + } else { + log = LoggerFactory.getLogger(loggerId); + } + + GooglePubsubConnectionFactory cf = (null == connectionFactory) + ? getComponent().getConnectionFactory() + : connectionFactory; + + pubsub = cf.getClient(); + + log.trace("Project ID: {}", this.projectId); + log.trace("Destination Name: {}", this.destinationName); + log.trace("From file : {}", cf.getCredentialsFileLocation()); + } + + public Producer createProducer() throws Exception { + afterPropertiesSet(); + return new GooglePubsubProducer(this); + } + + public Consumer createConsumer(Processor processor) throws Exception { + afterPropertiesSet(); + setExchangePattern(ExchangePattern.InOnly); + return new GooglePubsubConsumer(this, processor); + } + + public ExecutorService createExecutor() { + return getCamelContext() + .getExecutorServiceManager() + .newFixedThreadPool(this, + "GooglePubsubConsumer[" + getDestinationName() + "]", + concurrentConsumers); + } + + public boolean isSingleton() { + return false; + } + + public String getProjectId() { + return projectId; + } + + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + public String getLoggerId() { + return loggerId; + } + + public void setLoggerId(String loggerId) { + this.loggerId = loggerId; + } + + public String getDestinationName() { + return destinationName; + } + + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + public Integer getConcurrentConsumers() { + return concurrentConsumers; + } + + public void setConcurrentConsumers(Integer concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public Integer getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + public void setMaxMessagesPerPoll(Integer maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public GooglePubsubConstants.AckMode getAckMode() { + return ackMode; + } + + public void setAckMode(GooglePubsubConstants.AckMode ackMode) { + this.ackMode = ackMode; + } + + public Pubsub getPubsub() { + return pubsub; + } + + public GooglePubsubConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public void setConnectionFactory(GooglePubsubConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f57c33/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java new file mode 100644 index 0000000..aa457ef --- /dev/null +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.google.api.client.util.Strings; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generic PubSub Producer + */ +public class GooglePubsubProducer extends DefaultProducer { + + private Logger logger; + + public GooglePubsubProducer(GooglePubsubEndpoint endpoint) throws Exception { + super(endpoint); + + String loggerId = endpoint.getLoggerId(); + + if (Strings.isNullOrEmpty(loggerId)) { + loggerId = this.getClass().getName(); + } + + logger = LoggerFactory.getLogger(loggerId); + } + + /** + * The incoming message is expected to be either + * - a List of Exchanges (aggregated) + * - an Exchange + */ + @Override + public void process(Exchange exchange) throws Exception { + + List<Exchange> entryList = prepareExchangeList(exchange); + + if (entryList == null || entryList.size() == 0) { + logger.warn("The incoming message is either null or empty. Triggered by an aggregation timeout?"); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("uploader thread/id: " + + Thread.currentThread().getId() + + " / " + exchange.getExchangeId() + + " . api call completed."); + } + + sendMessages(entryList); + } + + /** + * The method converts a single incoming message into a List + * + * @param exchange + * @return + */ + private static List<Exchange> prepareExchangeList(Exchange exchange) { + + List<Exchange> entryList = null; + + if (null == exchange.getProperty(Exchange.GROUPED_EXCHANGE)) { + entryList = new ArrayList<>(); + entryList.add(exchange); + } else { + entryList = (List<Exchange>) exchange.getProperty(Exchange.GROUPED_EXCHANGE); + } + + return entryList; + } + + private void sendMessages(List<Exchange> exchanges) throws Exception { + + GooglePubsubEndpoint endpoint = (GooglePubsubEndpoint) getEndpoint(); + String topicName = String.format("projects/%s/topics/%s", endpoint.getProjectId(), endpoint.getDestinationName()); + + List<PubsubMessage> messages = new ArrayList<>(); + + for (Exchange exchange : exchanges) { + PubsubMessage message = new PubsubMessage(); + + Object body = exchange.getIn().getBody(); + + if (body instanceof String) { + message.encodeData(((String) body).getBytes("UTF-8")); + } else if (body instanceof byte[]) { + message.encodeData((byte[]) body); + } else { + message.encodeData(serialize(body)); + } + + Object attributes = exchange.getIn().getHeader(GooglePubsubConstants.ATTRIBUTES); + + if (attributes != null && attributes instanceof Map && ((Map)attributes).size() > 0) { + message.setAttributes((Map)attributes); + } + + messages.add(message); + } + + PublishRequest publishRequest = new PublishRequest().setMessages(messages); + + PublishResponse response = endpoint.getPubsub() + .projects() + .topics() + .publish(topicName, publishRequest) + .execute(); + + List<String> sentMessageIds = response.getMessageIds(); + + int i = 0; + for (Exchange entry : exchanges) { + entry.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, sentMessageIds.get(i)); + i++; + } + } + + public static byte[] serialize(Object obj) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(out); + os.writeObject(obj); + return out.toByteArray(); + } +}