This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push: new b2b00a39d28 CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account (#8667) b2b00a39d28 is described below commit b2b00a39d28ca05ccaa99b55fd2641cc8048ee5b Author: JiriOndrusek <ondrusek.j...@gmail.com> AuthorDate: Sat Nov 5 11:00:22 2022 +0100 CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with real account (#8667) --- .../camel/component/google/pubsub/GooglePubsubConsumer.java | 6 +++++- .../component/google/pubsub/consumer/AcknowledgeSync.java | 10 ++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index e639e400e22..ef3dc08a443 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -192,8 +192,12 @@ public class GooglePubsubConsumer extends DefaultConsumer { } if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { + //existing subscriber can not be propagated, because it will be closed at the end of this block + //subscriber will be created at the moment of use + // (see https://issues.apache.org/jira/browse/CAMEL-18447) exchange.adapt(ExtendedExchange.class) - .addOnCompletion(new AcknowledgeSync(subscriber, subscriptionName)); + .addOnCompletion(new AcknowledgeSync( + () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName)); } try { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java index df4498607d8..0c2fa0528f3 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java @@ -18,6 +18,7 @@ package org.apache.camel.component.google.pubsub.consumer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.pubsub.v1.AcknowledgeRequest; @@ -28,11 +29,12 @@ import org.apache.camel.spi.Synchronization; public class AcknowledgeSync implements Synchronization { - private final SubscriberStub subscriber; + //Supplier cannot be used because of thrown exception (Callback used instead) + private final Callable<SubscriberStub> subscriberStubSupplier; private final String subscriptionName; - public AcknowledgeSync(SubscriberStub subscriber, String subscriptionName) { - this.subscriber = subscriber; + public AcknowledgeSync(Callable<SubscriberStub> subscriberStubSupplier, String subscriptionName) { + this.subscriberStubSupplier = subscriberStubSupplier; this.subscriptionName = subscriptionName; } @@ -41,7 +43,7 @@ public class AcknowledgeSync implements Synchronization { AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder() .addAllAckIds(getAckIdList(exchange)) .setSubscription(subscriptionName).build(); - try { + try (SubscriberStub subscriber = subscriberStubSupplier.call()) { subscriber.acknowledgeCallable().call(ackRequest); } catch (Exception e) { throw new RuntimeCamelException(e);