This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 8bfdd18 Initialized subscribers in GooglePubsubConsumer (#3716) 8bfdd18 is described below commit 8bfdd185816912316c5f2672f7cde5f36772c6a6 Author: Taras <tarassuhove...@gmail.com> AuthorDate: Wed Apr 8 07:24:38 2020 +0300 Initialized subscribers in GooglePubsubConsumer (#3716) --- .../google/pubsub/GooglePubsubConsumer.java | 28 +++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index 0a00a38..74ff6f4 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.google.pubsub; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -52,6 +53,7 @@ class GooglePubsubConsumer extends DefaultConsumer { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; + this.subscribers = new LinkedList<>(); String loggerId = endpoint.getLoggerId(); @@ -96,20 +98,24 @@ class GooglePubsubConsumer extends DefaultConsumer { @Override public void run() { - String subscriptionName = ProjectSubscriptionName.format(endpoint.getProjectId(), endpoint.getDestinationName()); + try { + String subscriptionName = ProjectSubscriptionName.format(endpoint.getProjectId(), endpoint.getDestinationName()); - if (localLog.isDebugEnabled()) { - localLog.debug("Subscribing to {}", subscriptionName); - } + if (localLog.isDebugEnabled()) { + localLog.debug("Subscribing to {}", subscriptionName); + } - if (endpoint.isSynchronousPull()) { - synchronousPull(subscriptionName); - } else { - asynchronousPull(subscriptionName); - } + if (endpoint.isSynchronousPull()) { + synchronousPull(subscriptionName); + } else { + asynchronousPull(subscriptionName); + } - localLog.debug("Exit run for subscription {}", subscriptionName); + localLog.debug("Exit run for subscription {}", subscriptionName); + } catch (Exception e) { + localLog.error("Failure getting messages from PubSub", e); + } } private void asynchronousPull(String subscriptionName) { @@ -117,8 +123,8 @@ class GooglePubsubConsumer extends DefaultConsumer { MessageReceiver messageReceiver = new CamelMessageReceiver(endpoint, processor); Subscriber subscriber = endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver); - subscribers.add(subscriber); try { + subscribers.add(subscriber); subscriber.startAsync().awaitRunning(); subscriber.awaitTerminated(); } catch (Exception e) {