This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 95a2dea  CAMEL-16222: PooledExchangeFactory experiment
95a2dea is described below

commit 95a2deacb6965a7ae3a5ea873ffd8d75e2063d84
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Feb 23 09:40:19 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/pulsar/PulsarMessageListener.java    | 42 ++++++++--------------
 1 file changed, 14 insertions(+), 28 deletions(-)

diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 0607bda..9038c35 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.pulsar;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
@@ -24,13 +23,9 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class PulsarMessageListener implements MessageListener<byte[]> {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarMessageListener.class);
-
     private final PulsarEndpoint endpoint;
     private final PulsarConsumer pulsarConsumer;
 
@@ -41,33 +36,32 @@ public class PulsarMessageListener implements 
MessageListener<byte[]> {
 
     @Override
     public void received(final Consumer<byte[]> consumer, final 
Message<byte[]> message) {
-        final Exchange exchange = PulsarMessageUtils.updateExchange(message, 
endpoint.createExchange());
+        final Exchange exchange = PulsarMessageUtils.updateExchange(message, 
pulsarConsumer.createExchange(false));
 
-        try {
-            if 
(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
-                
exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
-                        
endpoint.getComponent().getPulsarMessageReceiptFactory()
-                                .newInstance(exchange, message, consumer));
-            }
-            processAsync(exchange, consumer, message);
-        } catch (Exception exception) {
-            handleProcessorException(exchange, exception);
+        if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
+            exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
+                    endpoint.getComponent().getPulsarMessageReceiptFactory()
+                            .newInstance(exchange, message, consumer));
         }
+        processAsync(exchange, consumer, message);
     }
 
     private void processAsync(final Exchange exchange, final Consumer<byte[]> 
consumer, final Message<byte[]> message) {
-        pulsarConsumer.getAsyncProcessor().process(exchange, new 
AsyncCallback() {
-            @Override
-            public void done(boolean doneSync) {
+        pulsarConsumer.getAsyncProcessor().process(exchange, doneSync -> {
+            try {
                 if (exchange.getException() != null) {
-                    handleProcessorException(exchange, 
exchange.getException());
+                    
pulsarConsumer.getExceptionHandler().handleException("Error processing 
exchange", exchange,
+                            exchange.getException());
                 } else {
                     try {
                         acknowledge(consumer, message);
                     } catch (Exception e) {
-                        handleProcessorException(exchange, e);
+                        
pulsarConsumer.getExceptionHandler().handleException("Error processing 
exchange", exchange,
+                                exchange.getException());
                     }
                 }
+            } finally {
+                pulsarConsumer.releaseExchange(exchange, false);
             }
         });
     }
@@ -79,12 +73,4 @@ public class PulsarMessageListener implements 
MessageListener<byte[]> {
         }
     }
 
-    private void handleProcessorException(final Exchange exchange, final 
Exception exception) {
-        final Exchange exchangeWithException = PulsarMessageUtils
-                .updateExchangeWithException(exception, exchange);
-
-        pulsarConsumer.getExceptionHandler()
-                .handleException("An error occurred", exchangeWithException, 
exception);
-    }
-
 }

Reply via email to