This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 95a2dea CAMEL-16222: PooledExchangeFactory experiment 95a2dea is described below commit 95a2deacb6965a7ae3a5ea873ffd8d75e2063d84 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 09:40:19 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../component/pulsar/PulsarMessageListener.java | 42 ++++++++-------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java index 0607bda..9038c35 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.pulsar; -import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils; @@ -24,13 +23,9 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClientException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PulsarMessageListener implements MessageListener<byte[]> { - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageListener.class); - private final PulsarEndpoint endpoint; private final PulsarConsumer pulsarConsumer; @@ -41,33 +36,32 @@ public class PulsarMessageListener implements MessageListener<byte[]> { @Override public void received(final Consumer<byte[]> consumer, final Message<byte[]> message) { - final Exchange exchange = PulsarMessageUtils.updateExchange(message, endpoint.createExchange()); + final Exchange exchange = PulsarMessageUtils.updateExchange(message, pulsarConsumer.createExchange(false)); - try { - if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) { - exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, - endpoint.getComponent().getPulsarMessageReceiptFactory() - .newInstance(exchange, message, consumer)); - } - processAsync(exchange, consumer, message); - } catch (Exception exception) { - handleProcessorException(exchange, exception); + if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) { + exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, + endpoint.getComponent().getPulsarMessageReceiptFactory() + .newInstance(exchange, message, consumer)); } + processAsync(exchange, consumer, message); } private void processAsync(final Exchange exchange, final Consumer<byte[]> consumer, final Message<byte[]> message) { - pulsarConsumer.getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { + pulsarConsumer.getAsyncProcessor().process(exchange, doneSync -> { + try { if (exchange.getException() != null) { - handleProcessorException(exchange, exchange.getException()); + pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); } else { try { acknowledge(consumer, message); } catch (Exception e) { - handleProcessorException(exchange, e); + pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); } } + } finally { + pulsarConsumer.releaseExchange(exchange, false); } }); } @@ -79,12 +73,4 @@ public class PulsarMessageListener implements MessageListener<byte[]> { } } - private void handleProcessorException(final Exchange exchange, final Exception exception) { - final Exchange exchangeWithException = PulsarMessageUtils - .updateExchangeWithException(exception, exchange); - - pulsarConsumer.getExceptionHandler() - .handleException("An error occurred", exchangeWithException, exception); - } - }