This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new b2b00a39d28 CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous 
task failed with real account (#8667)
b2b00a39d28 is described below

commit b2b00a39d28ca05ccaa99b55fd2641cc8048ee5b
Author: JiriOndrusek <ondrusek.j...@gmail.com>
AuthorDate: Sat Nov 5 11:00:22 2022 +0100

    CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with 
real account (#8667)
---
 .../camel/component/google/pubsub/GooglePubsubConsumer.java    |  6 +++++-
 .../component/google/pubsub/consumer/AcknowledgeSync.java      | 10 ++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index e639e400e22..ef3dc08a443 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -192,8 +192,12 @@ public class GooglePubsubConsumer extends DefaultConsumer {
                         }
 
                         if (endpoint.getAckMode() != 
GooglePubsubConstants.AckMode.NONE) {
+                            //existing subscriber can not be propagated, 
because it will be closed at the end of this block
+                            //subscriber will be created at the moment of use
+                            // (see  
https://issues.apache.org/jira/browse/CAMEL-18447)
                             exchange.adapt(ExtendedExchange.class)
-                                    .addOnCompletion(new 
AcknowledgeSync(subscriber, subscriptionName));
+                                    .addOnCompletion(new AcknowledgeSync(
+                                            () -> 
endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName));
                         }
 
                         try {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
index df4498607d8..0c2fa0528f3 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.google.pubsub.consumer;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 import com.google.pubsub.v1.AcknowledgeRequest;
@@ -28,11 +29,12 @@ import org.apache.camel.spi.Synchronization;
 
 public class AcknowledgeSync implements Synchronization {
 
-    private final SubscriberStub subscriber;
+    //Supplier cannot be used because of thrown exception (Callback used 
instead)
+    private final Callable<SubscriberStub> subscriberStubSupplier;
     private final String subscriptionName;
 
-    public AcknowledgeSync(SubscriberStub subscriber, String subscriptionName) 
{
-        this.subscriber = subscriber;
+    public AcknowledgeSync(Callable<SubscriberStub> subscriberStubSupplier, 
String subscriptionName) {
+        this.subscriberStubSupplier = subscriberStubSupplier;
         this.subscriptionName = subscriptionName;
     }
 
@@ -41,7 +43,7 @@ public class AcknowledgeSync implements Synchronization {
         AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder()
                 .addAllAckIds(getAckIdList(exchange))
                 .setSubscription(subscriptionName).build();
-        try {
+        try (SubscriberStub subscriber = subscriberStubSupplier.call()) {
             subscriber.acknowledgeCallable().call(ackRequest);
         } catch (Exception e) {
             throw new RuntimeCamelException(e);

Reply via email to