This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bfdd18  Initialized subscribers in GooglePubsubConsumer (#3716)
8bfdd18 is described below

commit 8bfdd185816912316c5f2672f7cde5f36772c6a6
Author: Taras <tarassuhove...@gmail.com>
AuthorDate: Wed Apr 8 07:24:38 2020 +0300

    Initialized subscribers in GooglePubsubConsumer (#3716)
---
 .../google/pubsub/GooglePubsubConsumer.java        | 28 +++++++++++++---------
 1 file changed, 17 insertions(+), 11 deletions(-)

diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 0a00a38..74ff6f4 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.google.pubsub;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -52,6 +53,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
+        this.subscribers = new LinkedList<>();
 
         String loggerId = endpoint.getLoggerId();
 
@@ -96,20 +98,24 @@ class GooglePubsubConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
-            String subscriptionName = 
ProjectSubscriptionName.format(endpoint.getProjectId(), 
endpoint.getDestinationName());
+            try {
+                String subscriptionName = 
ProjectSubscriptionName.format(endpoint.getProjectId(), 
endpoint.getDestinationName());
 
-            if (localLog.isDebugEnabled()) {
-                localLog.debug("Subscribing to {}", subscriptionName);
-            }
+                if (localLog.isDebugEnabled()) {
+                    localLog.debug("Subscribing to {}", subscriptionName);
+                }
 
 
-            if (endpoint.isSynchronousPull()) {
-                synchronousPull(subscriptionName);
-            } else {
-                asynchronousPull(subscriptionName);
-            }
+                if (endpoint.isSynchronousPull()) {
+                    synchronousPull(subscriptionName);
+                } else {
+                    asynchronousPull(subscriptionName);
+                }
 
-            localLog.debug("Exit run for subscription {}", subscriptionName);
+                localLog.debug("Exit run for subscription {}", 
subscriptionName);
+            } catch (Exception e) {
+                localLog.error("Failure getting messages from PubSub", e);
+            }
         }
 
         private void asynchronousPull(String subscriptionName) {
@@ -117,8 +123,8 @@ class GooglePubsubConsumer extends DefaultConsumer {
                 MessageReceiver messageReceiver = new 
CamelMessageReceiver(endpoint, processor);
 
                 Subscriber subscriber = 
endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver);
-                subscribers.add(subscriber);
                 try {
+                    subscribers.add(subscriber);
                     subscriber.startAsync().awaitRunning();
                     subscriber.awaitTerminated();
                 } catch (Exception e) {

Reply via email to