This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 320dd0d CAMEL-15617: Fix filter Google PubSub reserved attributes (#4343) 320dd0d is described below commit 320dd0df9dd362523c0ee8769f87eebf8100b465 Author: Alvin Kwekel <al...@liberition.com> AuthorDate: Wed Oct 7 06:19:24 2020 +0200 CAMEL-15617: Fix filter Google PubSub reserved attributes (#4343) * CAMEL-15617: Upgrade Google PubSub client * CAMEL-15617: Refactored creating test topics and subscriptions to be more extensible * CAMEL-15617: Split creating topic and subscription in tests * CAMEL-15617: Add test to recreate the issue of sending reserved attributes * CAMEL-15617: Remove reserved attributes before sending * CAMEL-15617: Formatting * CAMEL-15617: Update Camel dependencies * CAMEL-15617: Explicit GRPC dependencies * CAMEL-15617: Downgrade PubSub to have GRPC version match the Camel GRPC component Remove obsolete version properties for PubSub Remove dependency exclusions * CAMEL-15617: Update Camel dependencies module * CAMEL-15617: Update Camel dependencies module - forgot grpc * CAMEL-15617: Shutdown cached producers on component shutdown --- camel-dependencies/pom.xml | 6 +- components/camel-google-pubsub/pom.xml | 77 ----------- .../google/pubsub/GooglePubsubComponent.java | 1 + .../google/pubsub/GooglePubsubConstants.java | 1 + .../google/pubsub/GooglePubsubProducer.java | 9 +- .../component/google/pubsub/PubsubTestSupport.java | 95 ++++++++----- .../google/pubsub/integration/DeadLetterTest.java | 154 +++++++++++++++++++++ parent/pom.xml | 6 +- 8 files changed, 229 insertions(+), 120 deletions(-) diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index 2a3625b..9258fad 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -228,10 +228,9 @@ <google-api-services-calendar-version>v3-rev291-1.22.0</google-api-services-calendar-version> <google-api-services-drive-version>v2-rev297-1.22.0</google-api-services-drive-version> <google-api-services-mail-version>v1-rev81-1.22.0</google-api-services-mail-version> - <google-api-services-pubsub-version>v1-rev12-1.22.0</google-api-services-pubsub-version> <google-api-services-sheets-version>v4-rev551-1.22.0</google-api-services-sheets-version> <google-auto-value-version>1.7</google-auto-value-version> - <google-cloud-pubsub-version>1.102.0</google-cloud-pubsub-version> + <google-cloud-pubsub-version>1.105.0</google-cloud-pubsub-version> <google-errorprone-version>2.3.3</google-errorprone-version> <google-findbugs-annotations2-version>2.0.3</google-findbugs-annotations2-version> <google-findbugs-jsr305-version>3.0.2</google-findbugs-jsr305-version> @@ -244,7 +243,6 @@ <google-maps-services-version>0.10.1</google-maps-services-version> <google-proto-common-version>1.17.0</google-proto-common-version> <google-proto-iam-version>0.13.0</google-proto-iam-version> - <google-proto-pubsub-version>1.84.0</google-proto-pubsub-version> <google-pubsub-guava-version>28.2-jre</google-pubsub-guava-version> <graphql-java-version>14.0</graphql-java-version> <grizzly-websockets-version>2.3.25</grizzly-websockets-version> @@ -252,7 +250,7 @@ <grpc-guava-version>28.2-jre</grpc-guava-version> <grpc-java-jwt-version>3.7.0</grpc-java-jwt-version> <grpc-netty-tcnative-boringssl-static-version>2.0.28.Final</grpc-netty-tcnative-boringssl-static-version> - <grpc-version>1.28.0</grpc-version> + <grpc-version>1.28.1</grpc-version> <gson-version>2.8.5</gson-version> <guava-eventbus-version>28.2-jre</guava-eventbus-version> <guice3-version>3.0</guice3-version> diff --git a/components/camel-google-pubsub/pom.xml b/components/camel-google-pubsub/pom.xml index 10dd52e..54a76a7 100644 --- a/components/camel-google-pubsub/pom.xml +++ b/components/camel-google-pubsub/pom.xml @@ -47,84 +47,8 @@ <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> <version>${google-cloud-pubsub-version}</version> - <exclusions> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-api</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-alts</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-auth</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-context</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-core</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-netty-shaded</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-stub</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>grpc-protobuf</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-api</artifactId> - <version>${grpc-version}</version> - </dependency> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-alts</artifactId> - <version>${grpc-version}</version> </dependency> <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-auth</artifactId> - <version>${grpc-version}</version> - </dependency> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-context</artifactId> - <version>${grpc-version}</version> - </dependency> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-core</artifactId> - <version>${grpc-version}</version> - </dependency> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-netty-shaded</artifactId> - <version>${grpc-version}</version> - </dependency> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-stub</artifactId> - <version>${grpc-version}</version> - </dependency> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-protobuf</artifactId> - <version>${grpc-version}</version> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${google-pubsub-guava-version}</version> @@ -140,5 +64,4 @@ <scope>test</scope> </dependency> </dependencies> - </project> diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java index 8996292..fcd6445 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java @@ -112,6 +112,7 @@ public class GooglePubsubComponent extends DefaultComponent { @Override protected void doShutdown() throws Exception { cachedPublishers.cleanUp(); + cachedPublishers.invalidateAll(); super.doShutdown(); } diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java index 444d56d..17e625a 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java @@ -22,6 +22,7 @@ public final class GooglePubsubConstants { public static final String ACK_ID = "CamelGooglePubsub.MsgAckId"; public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime"; public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes"; + public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX = "goog"; public enum AckMode { AUTO, diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java index b779d45..123f130 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java @@ -32,6 +32,8 @@ import org.apache.camel.support.DefaultProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.component.google.pubsub.GooglePubsubConstants.RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX; + /** * Generic PubSub Producer */ @@ -97,10 +99,13 @@ public class GooglePubsubProducer extends DefaultProducer { } PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(byteString); - Map<String, String> attributes = exchange.getIn().getHeader(GooglePubsubConstants.ATTRIBUTES, Map.class); if (attributes != null) { - messageBuilder.putAllAttributes(attributes).build(); + for (Map.Entry<String, String> attribute : attributes.entrySet()) { + if (!attribute.getKey().startsWith(RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX)) { + messageBuilder.putAttributes(attribute.getKey(), attribute.getValue()); + } + } } PubsubMessage message = messageBuilder.build(); diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java index a8dd786..165f196 100644 --- a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java +++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.google.pubsub; +import java.io.IOException; import java.io.InputStream; import java.util.Properties; @@ -28,8 +29,9 @@ import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.pubsub.v1.TopicAdminSettings; import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.ProjectTopicName; -import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.camel.BindToRegistry; @@ -103,50 +105,77 @@ public class PubsubTestSupport extends ContainerAwareTestSupport { } public void createTopicSubscriptionPair(String topicName, String subscriptionName, int ackDeadlineSeconds) { - ManagedChannel channel = null; - TopicAdminClient topicAdminClient = null; - SubscriptionAdminClient subscriptionAdminClient = null; + TopicName projectTopicName = TopicName.of(PROJECT_ID, topicName); + ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionName); - try { - Integer port = container.getFirstMappedPort(); - channel = ManagedChannelBuilder - .forTarget(String.format("%s:%s", "localhost", port)) - .usePlaintext() - .build(); + Topic topic = Topic.newBuilder().setName(projectTopicName.toString()).build(); + Subscription subscription = Subscription.newBuilder() + .setName(projectSubscriptionName.toString()) + .setTopic(topic.getName()) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + + createTopicSubscriptionPair(topic, subscription); + } + + public void createTopicSubscriptionPair(Topic topic, Subscription subscription) { + createTopic(topic); + createSubscription(subscription); + } + + public void createTopic(Topic topic) { + TopicAdminClient topicAdminClient = createTopicAdminClient(); + + topicAdminClient.createTopic(topic); + + topicAdminClient.shutdown(); + } + + public void createSubscription(Subscription subscription) { + SubscriptionAdminClient subscriptionAdminClient = createSubscriptionAdminClient(); + + subscriptionAdminClient.createSubscription(subscription); - FixedTransportChannelProvider channelProvider - = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); - CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + subscriptionAdminClient.shutdown(); + } + + private FixedTransportChannelProvider createChannelProvider() { + Integer port = container.getFirstMappedPort(); + ManagedChannel channel = ManagedChannelBuilder + .forTarget(String.format("%s:%s", "localhost", port)) + .usePlaintext() + .build(); + + return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + } - ProjectTopicName projectTopicName = ProjectTopicName.of(PROJECT_ID, topicName); - ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionName); + private TopicAdminClient createTopicAdminClient() { + FixedTransportChannelProvider channelProvider = createChannelProvider(); + CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); - topicAdminClient = TopicAdminClient.create( + try { + return TopicAdminClient.create( TopicAdminSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .build()); - topicAdminClient.createTopic(projectTopicName); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private SubscriptionAdminClient createSubscriptionAdminClient() { + FixedTransportChannelProvider channelProvider = createChannelProvider(); + CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); - subscriptionAdminClient = SubscriptionAdminClient.create( + try { + return SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) .build()); - subscriptionAdminClient.createSubscription(projectSubscriptionName, projectTopicName, - PushConfig.getDefaultInstance(), ackDeadlineSeconds); - - } catch (Exception ignored) { - } finally { - if (channel != null) { - channel.shutdown(); - } - if (topicAdminClient != null) { - topicAdminClient.shutdown(); - } - if (subscriptionAdminClient != null) { - subscriptionAdminClient.shutdown(); - } + } catch (IOException e) { + throw new RuntimeException(e); } } } diff --git a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/DeadLetterTest.java b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/DeadLetterTest.java new file mode 100644 index 0000000..35674a9 --- /dev/null +++ b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/DeadLetterTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Test; + +public class DeadLetterTest extends PubsubTestSupport { + + private static final String INPUT_TOPIC_NAME = "camel.input-topic"; + private static final String INPUT_SUBSCRIPTION_NAME = "camel.input-topic-subscription"; + private static final String OUTPUT_TOPIC_NAME = "camel.output-topic"; + private static final String OUTPUT_SUBSCRIPTION_NAME = "camel.output-topic-subscription"; + private static final String DEAD_LETTER_TOPIC_NAME = "camel.dead-letter-topic"; + private static final String DEAD_LETTER_SUBSCRIPTION_NAME = "camel.dead-letter-topic-subscription"; + private static int count = 1; + + @EndpointInject("google-pubsub:{{project.id}}:" + INPUT_SUBSCRIPTION_NAME) + private Endpoint inputPubSubSubscription; + + @EndpointInject("google-pubsub:{{project.id}}:" + OUTPUT_TOPIC_NAME) + private Endpoint outputPubsubTopic; + + @EndpointInject("google-pubsub:{{project.id}}:" + OUTPUT_SUBSCRIPTION_NAME) + private Endpoint outputPubsubSubscription; + + @EndpointInject("google-pubsub:{{project.id}}:" + DEAD_LETTER_SUBSCRIPTION_NAME) + private Endpoint deadLetterPubsubSubscription; + + @EndpointInject("mock:input") + private MockEndpoint inputMock; + + @EndpointInject("mock:dead-letter") + private MockEndpoint deadLetterMock; + + @EndpointInject("mock:output") + private MockEndpoint outputMock; + + @Produce("google-pubsub:{{project.id}}:" + INPUT_TOPIC_NAME) + private ProducerTemplate producer; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(inputPubSubSubscription) + .routeId("receiver") + .to(inputMock) + .process(e -> { + if (count < 3) { + count = ++count; + throw new Exception("Redeliver please"); + } + }) + .to(outputPubsubTopic); + + from(outputPubsubSubscription) + .routeId("output") + .to(outputMock); + + from(deadLetterPubsubSubscription) + .routeId("dead-letter") + .to(deadLetterMock); + } + }; + } + + @Override + public void createTopicSubscription() { + TopicName projectInputTopicName = TopicName.of(PROJECT_ID, INPUT_TOPIC_NAME); + TopicName projectOutputTopicName = TopicName.of(PROJECT_ID, OUTPUT_TOPIC_NAME); + TopicName projectDeadLetterTopicName = TopicName.of(PROJECT_ID, DEAD_LETTER_TOPIC_NAME); + ProjectSubscriptionName projectInputSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, INPUT_SUBSCRIPTION_NAME); + ProjectSubscriptionName projectOutputSubscriptionName + = ProjectSubscriptionName.of(PROJECT_ID, OUTPUT_SUBSCRIPTION_NAME); + ProjectSubscriptionName projectDeadLetterSubscriptionName + = ProjectSubscriptionName.of(PROJECT_ID, DEAD_LETTER_SUBSCRIPTION_NAME); + + Topic inputTopic = Topic.newBuilder().setName(projectInputTopicName.toString()).build(); + Topic outputTopic = Topic.newBuilder().setName(projectOutputTopicName.toString()).build(); + Topic deadLetterTopic = Topic.newBuilder().setName(projectDeadLetterTopicName.toString()).build(); + Subscription inputSubscription = Subscription.newBuilder() + .setName(projectInputSubscriptionName.toString()) + .setTopic(inputTopic.getName()) + .setDeadLetterPolicy(DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopic.toString()) + .setMaxDeliveryAttempts(5).build()) + .build(); + Subscription deadLetterSubscription = Subscription.newBuilder() + .setName(projectDeadLetterSubscriptionName.toString()) + .setTopic(deadLetterTopic.getName()) + .build(); + Subscription outputSubscription = Subscription.newBuilder() + .setName(projectOutputSubscriptionName.toString()) + .setTopic(outputTopic.getName()) + .build(); + + createTopicSubscriptionPair(deadLetterTopic, deadLetterSubscription); + createTopicSubscriptionPair(inputTopic, inputSubscription); + createTopicSubscriptionPair(outputTopic, outputSubscription); + } + + /** + * Expecting the route to, on the third attempt, send the message to PubSub without the "googclient_deliveryattempt" + * attribute. This attribute is set when a message gets redelivered, but it is not allowed to be set when sending. + * The the PubSub emulator currently doesn't support dead letter topics so this test is only representative when run + * against the Google Cloud PubSub. + * + * @throws InterruptedException + */ + @Test + public void redeliverAndSend() throws InterruptedException { + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody(exchange.getExchangeId()); + + inputMock.expectedMessageCount(3); + deadLetterMock.expectedMessageCount(0); + outputMock.expectedMessageCount(1); + + producer.send(exchange); + + outputMock.assertIsSatisfied(2000); + inputMock.assertIsSatisfied(); + deadLetterMock.assertIsSatisfied(); + } +} diff --git a/parent/pom.xml b/parent/pom.xml index 3424838..778c01d 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -214,8 +214,7 @@ <google-api-services-sheets-version>v4-rev551-1.22.0</google-api-services-sheets-version> <google-api-services-mail-version>v1-rev81-1.22.0</google-api-services-mail-version> <google-api-services-bigquery-version>v2-rev352-1.22.0</google-api-services-bigquery-version> - <google-api-services-pubsub-version>v1-rev12-1.22.0</google-api-services-pubsub-version> - <google-cloud-pubsub-version>1.102.0</google-cloud-pubsub-version> + <google-cloud-pubsub-version>1.105.0</google-cloud-pubsub-version> <google-errorprone-version>2.3.3</google-errorprone-version> <google-gax-version>1.50.1</google-gax-version> <google-http-client-version>1.33.0</google-http-client-version> @@ -223,11 +222,10 @@ <google-j2objc-version>1.3</google-j2objc-version> <google-mail-guava-version>17.0</google-mail-guava-version> <google-proto-common-version>1.17.0</google-proto-common-version> - <google-proto-pubsub-version>1.84.0</google-proto-pubsub-version> <google-proto-iam-version>0.13.0</google-proto-iam-version> <graphql-java-version>14.0</graphql-java-version> <grizzly-websockets-version>2.3.25</grizzly-websockets-version> - <grpc-version>1.28.0</grpc-version> + <grpc-version>1.28.1</grpc-version> <grpc-google-auth-library-version>0.19.0</grpc-google-auth-library-version> <grpc-guava-version>28.2-jre</grpc-guava-version> <grpc-java-jwt-version>3.7.0</grpc-java-jwt-version>